diff --git a/format_spec/filter_pipeline.md b/format_spec/filter_pipeline.md index dff1c640751..96ef8786793 100644 --- a/format_spec/filter_pipeline.md +++ b/format_spec/filter_pipeline.md @@ -42,12 +42,13 @@ The filter options are configuration parameters for the filters that do not chan ### Main Compressor Options -For the compression filters \(any of the filter types `TILEDB_FILTER_{GZIP,ZSTD,LZ4,RLE,BZIP2,DOUBLE_DELTA,DICTIONARY}`\) the filter options have internal format: +For the compression filters \(any of the filter types `TILEDB_FILTER_{GZIP,ZSTD,LZ4,RLE,BZIP2,DOUBLE_DELTA,DELTA,DICTIONARY}`\) the filter options have internal format: | **Field** | **Type** | **Description** | | :--- | :--- | :--- | | Compressor type | `uint8_t` | Type of compression \(e.g. `TILEDB_BZIP2`\) | | Compression level | `int32_t` | Compression level used \(ignored by some compressors\). | +| Reinterpret datatype | `uint8_t` | Type to reinterpret data prior to compression. Used for DOUBLE_DELTA and DELTA only. | ### Bit-width Reduction Options diff --git a/test/regression/targets/sc-24079.cc b/test/regression/targets/sc-24079.cc index 28b35ba3969..10e92ad6fd3 100644 --- a/test/regression/targets/sc-24079.cc +++ b/test/regression/targets/sc-24079.cc @@ -10,7 +10,7 @@ std::string array_name = "cpp_unit_array_24079"; TEST_CASE( "C++ API: DoubleDelta filter typecheck should account for output type of " "FloatScaleFilter", - "[cppapi][filter][float-scaling][!shouldfail]") { + "[cppapi][filter][float-scaling]") { tiledb::Context ctx; tiledb::VFS vfs(ctx); @@ -24,7 +24,7 @@ TEST_CASE( // Create and initialize dimension. auto d1 = tiledb::Dimension::create( - ctx, "soma_joinid", {{domain_lo, domain_hi}}, 2048); + ctx, "d1", {{domain_lo, domain_hi}}, 2048); tiledb::Filter float_scale(ctx, TILEDB_FILTER_SCALE_FLOAT); double scale = 1.0f; @@ -40,22 +40,55 @@ TEST_CASE( tiledb::FilterList filters(ctx); filters.add_filter(float_scale); filters.add_filter(dd); - d1.set_filter_list(filters); + d1.set_filter_list(filters); domain.add_dimension(d1); - auto a = tiledb::Attribute::create(ctx, "A"); + auto a1 = tiledb::Attribute::create(ctx, "a1"); + a1.set_filter_list(filters); tiledb::ArraySchema schema(ctx, TILEDB_SPARSE); schema.set_domain(domain); - schema.add_attribute(a); + schema.add_attribute(a1); schema.set_capacity(100000); schema.set_cell_order(TILEDB_ROW_MAJOR); schema.set_tile_order(TILEDB_ROW_MAJOR); CHECK_NOTHROW(tiledb::Array::create(array_name, schema)); + std::vector d1_data = { + 1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f, 9.0f, 10.0f}; + std::vector a1_data = { + 1.0f, 2.1f, 3.2f, 4.3f, 5.4f, 6.5f, 7.6f, 8.7f, 9.8f, 10.9f}; + + // Write to array. + { + tiledb::Array array(ctx, array_name, TILEDB_WRITE); + tiledb::Query query(ctx, array); + query.set_data_buffer("d1", d1_data); + query.set_data_buffer("a1", a1_data); + query.submit(); + CHECK(tiledb::Query::Status::COMPLETE == query.query_status()); + } + + // Read from array. + { + std::vector d1_read(10); + std::vector a1_read(10); + tiledb::Array array(ctx, array_name, TILEDB_READ); + tiledb::Query query(ctx, array); + query.set_subarray({domain_lo, domain_hi}); + query.set_data_buffer("a1", a1_read); + query.set_data_buffer("d1", d1_read); + query.submit(); + CHECK(tiledb::Query::Status::COMPLETE == query.query_status()); + CHECK( + std::vector{ + 1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 7.0f, 8.0f, 9.0f, 10.0f, 11.0f} == + a1_read); + CHECK(d1_data == d1_read); + } // Cleanup. if (vfs.is_dir(array_name)) { vfs.remove_dir(array_name); } -} \ No newline at end of file +} diff --git a/test/src/unit-capi-sparse_array.cc b/test/src/unit-capi-sparse_array.cc index e415dca1d43..affeb8ac350 100644 --- a/test/src/unit-capi-sparse_array.cc +++ b/test/src/unit-capi-sparse_array.cc @@ -2486,6 +2486,14 @@ TEST_CASE_METHOD( TILEDB_ROW_MAJOR, TILEDB_COL_MAJOR); } + + SECTION("- delta compression, row/col-major") { + // TODO: refactor for each supported FS. + std::string temp_dir = fs_vec_[0]->temp_dir(); + array_name = temp_dir + ARRAY; + check_sorted_reads( + array_name, TILEDB_FILTER_DELTA, TILEDB_ROW_MAJOR, TILEDB_COL_MAJOR); + } } TEST_CASE_METHOD( diff --git a/test/src/unit-cppapi-webp-filter.cc b/test/src/unit-cppapi-webp-filter.cc index c071211fc35..259e668da30 100644 --- a/test/src/unit-cppapi-webp-filter.cc +++ b/test/src/unit-cppapi-webp-filter.cc @@ -254,7 +254,10 @@ TEMPLATE_LIST_TEST_CASE( // Create an invalid attribute for use with WebP filter. auto invalid_attr = Attribute::create(ctx, "rgb"); - invalid_attr.set_filter_list(filterList); + REQUIRE_THROWS_WITH( + invalid_attr.set_filter_list(filterList), + Catch::Matchers::ContainsSubstring( + "Filter WEBP does not accept input type")); // WebP filter requires exactly 2 dimensions for Y, X. { @@ -299,18 +302,6 @@ TEMPLATE_LIST_TEST_CASE( "In dense arrays, all dimensions must have the same datatype")); } - // WebP filter supports only uint8 attributes. - { - ArraySchema invalid_schema(ctx, TILEDB_DENSE); - invalid_schema.set_domain(valid_domain); - - invalid_schema.add_attribute(invalid_attr); - REQUIRE_THROWS_WITH( - Array::create(webp_array_name, invalid_schema), - Catch::Matchers::ContainsSubstring( - "WebP filter supports only uint8 attributes")); - } - // WebP filter can only be applied to dense arrays. { ArraySchema invalid_schema(ctx, TILEDB_SPARSE); diff --git a/test/src/unit-filter-pipeline.cc b/test/src/unit-filter-pipeline.cc index 7aab4a16eb4..f045db02e5b 100644 --- a/test/src/unit-filter-pipeline.cc +++ b/test/src/unit-filter-pipeline.cc @@ -54,6 +54,7 @@ #include "tiledb/sm/filter/filter_pipeline.h" #include "tiledb/sm/filter/float_scaling_filter.h" #include "tiledb/sm/filter/positive_delta_filter.h" +#include "tiledb/sm/filter/webp_filter.h" #include "tiledb/sm/filter/xor_filter.h" #include "tiledb/sm/tile/tile.h" #include "tiledb/stdx/utility/to_underlying.h" @@ -143,8 +144,8 @@ void run_reverse( class Add1InPlace : public tiledb::sm::Filter { public: // Just use a dummy filter type - Add1InPlace() - : Filter(FilterType::FILTER_NONE) { + Add1InPlace(Datatype filter_data_type) + : Filter(FilterType::FILTER_NONE, filter_data_type) { } void dump(FILE* out) const override { @@ -203,7 +204,7 @@ class Add1InPlace : public tiledb::sm::Filter { } Add1InPlace* clone_impl() const override { - return new Add1InPlace(); + return new Add1InPlace(filter_data_type_); } }; @@ -214,8 +215,8 @@ class Add1InPlace : public tiledb::sm::Filter { class Add1OutOfPlace : public tiledb::sm::Filter { public: // Just use a dummy filter type - Add1OutOfPlace() - : Filter(FilterType::FILTER_NONE) { + Add1OutOfPlace(Datatype filter_data_type) + : Filter(FilterType::FILTER_NONE, filter_data_type) { } void dump(FILE* out) const override { @@ -295,7 +296,7 @@ class Add1OutOfPlace : public tiledb::sm::Filter { } Add1OutOfPlace* clone_impl() const override { - return new Add1OutOfPlace(); + return new Add1OutOfPlace(filter_data_type_); } }; @@ -306,8 +307,8 @@ class Add1OutOfPlace : public tiledb::sm::Filter { class AddNInPlace : public tiledb::sm::Filter { public: // Just use a dummy filter type - AddNInPlace() - : Filter(FilterType::FILTER_NONE) { + AddNInPlace(Datatype filter_data_type) + : Filter(FilterType::FILTER_NONE, filter_data_type) { increment_ = 1; } @@ -374,7 +375,7 @@ class AddNInPlace : public tiledb::sm::Filter { } AddNInPlace* clone_impl() const override { - auto clone = new AddNInPlace(); + auto clone = new AddNInPlace(filter_data_type_); clone->increment_ = increment_; return clone; } @@ -390,8 +391,8 @@ class AddNInPlace : public tiledb::sm::Filter { class PseudoChecksumFilter : public tiledb::sm::Filter { public: // Just use a dummy filter type - PseudoChecksumFilter() - : Filter(FilterType::FILTER_NONE) { + PseudoChecksumFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_NONE, filter_data_type) { } void dump(FILE* out) const override { @@ -469,7 +470,7 @@ class PseudoChecksumFilter : public tiledb::sm::Filter { } PseudoChecksumFilter* clone_impl() const override { - return new PseudoChecksumFilter(); + return new PseudoChecksumFilter(filter_data_type_); } }; @@ -481,8 +482,8 @@ class PseudoChecksumFilter : public tiledb::sm::Filter { class Add1IncludingMetadataFilter : public tiledb::sm::Filter { public: // Just use a dummy filter type - Add1IncludingMetadataFilter() - : Filter(FilterType::FILTER_NONE) { + Add1IncludingMetadataFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_NONE, filter_data_type) { } void dump(FILE* out) const override { @@ -600,7 +601,7 @@ class Add1IncludingMetadataFilter : public tiledb::sm::Filter { } Add1IncludingMetadataFilter* clone_impl() const override { - return new Add1IncludingMetadataFilter; + return new Add1IncludingMetadataFilter(filter_data_type_); } }; @@ -756,7 +757,7 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1InPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); SECTION("- Single stage") { CHECK( @@ -804,8 +805,8 @@ TEST_CASE( SECTION("- Multi-stage") { // Add a few more +1 filters and re-run. - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1InPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); CHECK( pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); @@ -891,7 +892,7 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1InPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); SECTION("- Single stage") { WriterTile::set_max_tile_chunk_size(80); @@ -945,8 +946,8 @@ TEST_CASE( SECTION("- Multi-stage") { // Add a few more +1 filters and re-run. WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1InPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) .ok()); @@ -1008,7 +1009,7 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1OutOfPlace()); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); SECTION("- Single stage") { CHECK( @@ -1056,8 +1057,8 @@ TEST_CASE( SECTION("- Multi-stage") { // Add a few more +1 filters and re-run. - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(Add1OutOfPlace()); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); CHECK( pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); @@ -1143,7 +1144,7 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1OutOfPlace()); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); SECTION("- Single stage") { WriterTile::set_max_tile_chunk_size(80); @@ -1197,8 +1198,8 @@ TEST_CASE( SECTION("- Multi-stage") { // Add a few more +1 filters and re-run. WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(Add1OutOfPlace()); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) .ok()); @@ -1260,10 +1261,10 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1OutOfPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); CHECK(tile.size() == 0); @@ -1347,10 +1348,10 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1OutOfPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) .ok()); @@ -1423,9 +1424,10 @@ TEST_CASE("Filter: Test compression", "[filter][compression]") { ThreadPool tp(4); SECTION("- Simple") { - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(CompressionFilter(tiledb::sm::Compressor::LZ4, 5)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); CHECK( pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); @@ -1446,8 +1448,9 @@ TEST_CASE("Filter: Test compression", "[filter][compression]") { } SECTION("- With checksum stage") { - pipeline.add_filter(PseudoChecksumFilter()); - pipeline.add_filter(CompressionFilter(tiledb::sm::Compressor::LZ4, 5)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); CHECK( pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); @@ -1468,10 +1471,11 @@ TEST_CASE("Filter: Test compression", "[filter][compression]") { } SECTION("- With multiple stages") { - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(PseudoChecksumFilter()); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(CompressionFilter(tiledb::sm::Compressor::LZ4, 5)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); CHECK( pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); @@ -1547,9 +1551,10 @@ TEST_CASE("Filter: Test compression var", "[filter][compression][var]") { SECTION("- Simple") { WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(CompressionFilter(tiledb::sm::Compressor::LZ4, 5)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) .ok()); @@ -1573,8 +1578,9 @@ TEST_CASE("Filter: Test compression var", "[filter][compression][var]") { SECTION("- With checksum stage") { WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(PseudoChecksumFilter()); - pipeline.add_filter(CompressionFilter(tiledb::sm::Compressor::LZ4, 5)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) .ok()); @@ -1598,10 +1604,11 @@ TEST_CASE("Filter: Test compression var", "[filter][compression][var]") { SECTION("- With multiple stages") { WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(PseudoChecksumFilter()); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(CompressionFilter(tiledb::sm::Compressor::LZ4, 5)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) .ok()); @@ -1636,7 +1643,7 @@ TEST_CASE("Filter: Test pseudo-checksum", "[filter][pseudo-checksum]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(PseudoChecksumFilter()); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); SECTION("- Single stage") { CHECK( @@ -1690,9 +1697,9 @@ TEST_CASE("Filter: Test pseudo-checksum", "[filter][pseudo-checksum]") { } SECTION("- Multi-stage") { - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(PseudoChecksumFilter()); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); CHECK( pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); @@ -1798,7 +1805,7 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(PseudoChecksumFilter()); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); SECTION("- Single stage") { WriterTile::set_max_tile_chunk_size(80); @@ -1858,9 +1865,9 @@ TEST_CASE( SECTION("- Multi-stage") { WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1OutOfPlace()); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(PseudoChecksumFilter()); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) .ok()); @@ -1934,9 +1941,9 @@ TEST_CASE("Filter: Test pipeline modify filter", "[filter][modify]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(AddNInPlace()); - pipeline.add_filter(Add1InPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(AddNInPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); // Get non-existent filter instance auto* cksum = pipeline.get_filter(); @@ -2026,9 +2033,9 @@ TEST_CASE("Filter: Test pipeline modify filter var", "[filter][modify][var]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(AddNInPlace()); - pipeline.add_filter(Add1InPlace()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(AddNInPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); // Get non-existent filter instance auto* cksum = pipeline.get_filter(); @@ -2100,10 +2107,10 @@ TEST_CASE("Filter: Test pipeline copy", "[filter][copy]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(AddNInPlace()); - pipeline.add_filter(Add1InPlace()); - pipeline.add_filter(PseudoChecksumFilter()); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(AddNInPlace(Datatype::UINT64)); + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); // Modify +N filter auto* add_n = pipeline.get_filter(); @@ -2182,25 +2189,28 @@ TEST_CASE("Filter: Test random pipeline", "[filter][random]") { // List of potential filters to use. All of these filters can occur anywhere // in the pipeline. std::vector> constructors = { - []() { return new Add1InPlace(); }, - []() { return new Add1OutOfPlace(); }, - []() { return new Add1IncludingMetadataFilter(); }, - []() { return new BitWidthReductionFilter(); }, - []() { return new BitshuffleFilter(); }, - []() { return new ByteshuffleFilter(); }, - []() { return new CompressionFilter(tiledb::sm::Compressor::BZIP2, -1); }, - []() { return new PseudoChecksumFilter(); }, - []() { return new ChecksumMD5Filter(); }, - []() { return new ChecksumSHA256Filter(); }, + []() { return new Add1InPlace(Datatype::UINT64); }, + []() { return new Add1OutOfPlace(Datatype::UINT64); }, + []() { return new Add1IncludingMetadataFilter(Datatype::UINT64); }, + []() { return new BitWidthReductionFilter(Datatype::UINT64); }, + []() { return new BitshuffleFilter(Datatype::UINT64); }, + []() { return new ByteshuffleFilter(Datatype::UINT64); }, + []() { + return new CompressionFilter( + tiledb::sm::Compressor::BZIP2, -1, Datatype::UINT64); + }, + []() { return new PseudoChecksumFilter(Datatype::UINT64); }, + []() { return new ChecksumMD5Filter(Datatype::UINT64); }, + []() { return new ChecksumSHA256Filter(Datatype::UINT64); }, [&encryption_key]() { - return new EncryptionAES256GCMFilter(encryption_key); + return new EncryptionAES256GCMFilter(encryption_key, Datatype::UINT64); }, }; // List of potential filters that must occur at the beginning of the pipeline. std::vector> constructors_first = { // Pos-delta would (correctly) return error after e.g. compression. - []() { return new PositiveDeltaFilter(); }}; + []() { return new PositiveDeltaFilter(Datatype::UINT64); }}; ThreadPool tp(4); for (int i = 0; i < 100; i++) { @@ -2263,7 +2273,7 @@ TEST_CASE( // MD5 FilterPipeline md5_pipeline; ThreadPool tp(4); - ChecksumMD5Filter md5_filter; + ChecksumMD5Filter md5_filter(Datatype::UINT64); md5_pipeline.add_filter(md5_filter); CHECK(md5_pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp) .ok()); @@ -2284,7 +2294,7 @@ TEST_CASE( auto tile2 = make_increasing_tile(nelts); FilterPipeline sha_256_pipeline; - ChecksumMD5Filter sha_256_filter; + ChecksumMD5Filter sha_256_filter(Datatype::UINT64); sha_256_pipeline.add_filter(sha_256_filter); CHECK( sha_256_pipeline.run_forward(&test::g_helper_stats, &tile2, nullptr, &tp) @@ -2310,7 +2320,7 @@ TEST_CASE("Filter: Test bit width reduction", "[filter][bit-width-reduction]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(BitWidthReductionFilter()); + pipeline.add_filter(BitWidthReductionFilter(Datatype::UINT64)); SECTION("- Single stage") { auto tile = make_increasing_tile(nelts); @@ -2521,7 +2531,7 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(BitWidthReductionFilter()); + pipeline.add_filter(BitWidthReductionFilter(Datatype::UINT64)); SECTION("- Single stage") { auto tile = make_increasing_tile(nelts); @@ -2755,7 +2765,7 @@ TEST_CASE("Filter: Test positive-delta encoding", "[filter][positive-delta]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(PositiveDeltaFilter()); + pipeline.add_filter(PositiveDeltaFilter(Datatype::UINT64)); SECTION("- Single stage") { auto tile = make_increasing_tile(nelts); @@ -2874,7 +2884,7 @@ TEST_CASE( FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(PositiveDeltaFilter()); + pipeline.add_filter(PositiveDeltaFilter(Datatype::UINT64)); SECTION("- Single stage") { auto tile = make_increasing_tile(nelts); @@ -3000,7 +3010,7 @@ TEST_CASE("Filter: Test bitshuffle", "[filter][bitshuffle]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(BitshuffleFilter()); + pipeline.add_filter(BitshuffleFilter(Datatype::UINT64)); SECTION("- Single stage") { CHECK( @@ -3088,7 +3098,7 @@ TEST_CASE("Filter: Test bitshuffle var", "[filter][bitshuffle][var]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(BitshuffleFilter()); + pipeline.add_filter(BitshuffleFilter(Datatype::UINT64)); SECTION("- Single stage") { WriterTile::set_max_tile_chunk_size(80); @@ -3151,7 +3161,7 @@ TEST_CASE("Filter: Test byteshuffle", "[filter][byteshuffle]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(ByteshuffleFilter()); + pipeline.add_filter(ByteshuffleFilter(Datatype::UINT64)); SECTION("- Single stage") { CHECK( @@ -3239,7 +3249,7 @@ TEST_CASE("Filter: Test byteshuffle var", "[filter][byteshuffle][var]") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(ByteshuffleFilter()); + pipeline.add_filter(ByteshuffleFilter(Datatype::UINT64)); SECTION("- Single stage") { WriterTile::set_max_tile_chunk_size(80); @@ -3303,7 +3313,7 @@ TEST_CASE("Filter: Test encryption", "[filter][encryption]") { SECTION("- AES-256-GCM") { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(EncryptionAES256GCMFilter()); + pipeline.add_filter(EncryptionAES256GCMFilter(Datatype::UINT64)); // No key set CHECK( @@ -3364,7 +3374,6 @@ void testing_float_scaling_filter() { const uint64_t nelts = 100; const uint64_t tile_size = nelts * sizeof(FloatingType); const uint64_t cell_size = sizeof(FloatingType); - ; Datatype t = Datatype::FLOAT32; switch (sizeof(FloatingType)) { @@ -3408,7 +3417,7 @@ void testing_float_scaling_filter() { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(FloatScalingFilter()); + pipeline.add_filter(FloatScalingFilter(t)); CHECK(pipeline.get_filter() ->set_option(FilterOption::SCALE_FLOAT_BYTEWIDTH, &byte_width) .ok()); @@ -3461,7 +3470,6 @@ void testing_xor_filter(Datatype t) { const uint64_t nelts = 100; const uint64_t tile_size = nelts * sizeof(T); const uint64_t cell_size = sizeof(T); - ; WriterTile tile(constants::format_version, t, cell_size, tile_size); @@ -3480,7 +3488,7 @@ void testing_xor_filter(Datatype t) { FilterPipeline pipeline; ThreadPool tp(4); - pipeline.add_filter(XORFilter()); + pipeline.add_filter(XORFilter(t)); CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); @@ -3523,3 +3531,323 @@ TEST_CASE("Filter: Test XOR", "[filter][xor]") { testing_xor_filter(Datatype::DATETIME_FS); testing_xor_filter(Datatype::DATETIME_AS); } + +TEST_CASE("Filter: Pipeline filtered output types", "[filter][pipeline]") { + FilterPipeline pipeline; + + SECTION("- DoubleDelta filter reinterprets float->int32") { + pipeline.add_filter(CompressionFilter( + tiledb::sm::Compressor::DOUBLE_DELTA, + 0, + Datatype::FLOAT32, + Datatype::INT32)); + pipeline.add_filter(BitWidthReductionFilter(Datatype::INT32)); + } + + SECTION("- Delta filter reinterprets float->int32") { + pipeline.add_filter(CompressionFilter( + tiledb::sm::Compressor::DELTA, 0, Datatype::FLOAT32, Datatype::INT32)); + pipeline.add_filter(BitWidthReductionFilter(Datatype::INT32)); + } + + SECTION("- FloatScale filter converts float->int32") { + pipeline.add_filter( + FloatScalingFilter(sizeof(int32_t), 1.0f, 0.0f, Datatype::FLOAT32)); + pipeline.add_filter(PositiveDeltaFilter(Datatype::INT32)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::DELTA, 0, Datatype::INT32)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::BZIP2, 2, Datatype::INT32)); + pipeline.add_filter(BitshuffleFilter(Datatype::INT32)); + pipeline.add_filter(ByteshuffleFilter(Datatype::INT32)); + pipeline.add_filter(BitWidthReductionFilter(Datatype::INT32)); + } + + size_t byte_width = 0; + SECTION("- XOR filter expected output types") { + byte_width = GENERATE( + sizeof(int8_t), sizeof(int16_t), sizeof(int32_t), sizeof(int64_t)); + pipeline.add_filter( + FloatScalingFilter(byte_width, 1.0f, 0.0f, Datatype::FLOAT32)); + auto byte_width_t = + pipeline.get_filter()->output_datatype( + Datatype::FLOAT32); + pipeline.add_filter(XORFilter(byte_width_t)); + } + + SECTION("- XOR filter expected output types large pipeline") { + byte_width = GENERATE( + sizeof(int8_t), sizeof(int16_t), sizeof(int32_t), sizeof(int64_t)); + pipeline.add_filter( + FloatScalingFilter(byte_width, 1.0f, 0.0f, Datatype::FLOAT32)); + auto byte_width_t = + pipeline.get_filter()->output_datatype( + Datatype::FLOAT32); + pipeline.add_filter(PositiveDeltaFilter(byte_width_t)); + pipeline.add_filter(BitshuffleFilter(byte_width_t)); + pipeline.add_filter(ByteshuffleFilter(byte_width_t)); + pipeline.add_filter(XORFilter(byte_width_t)); + } + + // Initial type of tile is float. + std::vector data = { + 1.0f, 2.1f, 3.2f, 4.3f, 5.4f, 6.5f, 7.6f, 8.7f, 9.8f, 10.9f}; + WriterTile tile( + constants::format_version, + Datatype::FLOAT32, + sizeof(float), + sizeof(float) * data.size()); + for (size_t i = 0; i < data.size(); i++) { + CHECK(tile.write(&data[i], i * sizeof(float), sizeof(float)).ok()); + } + + ThreadPool tp(4); + REQUIRE( + pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); + CHECK(tile.size() == 0); + CHECK(tile.filtered_buffer().size() != 0); + + auto unfiltered_tile = create_tile_for_unfiltering(data.size(), tile); + ChunkData chunk_data; + unfiltered_tile.load_chunk_data(chunk_data); + REQUIRE(pipeline + .run_reverse( + &test::g_helper_stats, + &unfiltered_tile, + nullptr, + chunk_data, + 0, + chunk_data.filtered_chunks_.size(), + tp.concurrency_level(), + tiledb::sm::Config()) + .ok()); + std::vector results{ + 1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 7.0f, 8.0f, 9.0f, 10.0f, 11.0f}; + for (size_t i = 0; i < data.size(); i++) { + float val = 0; + CHECK(unfiltered_tile.read(&val, i * sizeof(float), sizeof(float)).ok()); + if (pipeline.has_filter(tiledb::sm::FilterType::FILTER_SCALE_FLOAT)) { + // Loss of precision from rounding in FloatScale filter. + CHECK(val == results[i]); + } else { + CHECK(val == data[i]); + } + } +} + +TEST_CASE( + "C++ API: Pipeline with filtered type conversions", + "[cppapi][filter][pipeline]") { + tiledb::Context ctx; + tiledb::VFS vfs(ctx); + std::string array_name = "cpp_test_array"; + if (vfs.is_dir(array_name)) { + vfs.remove_dir(array_name); + } + + tiledb::Domain domain(ctx); + float domain_lo = static_cast(std::numeric_limits::min()); + float domain_hi = static_cast(std::numeric_limits::max() - 1); + + // Create and initialize dimension. + auto d1 = tiledb::Dimension::create( + ctx, "d1", {{domain_lo, domain_hi}}, 2048); + + tiledb::Filter float_scale(ctx, TILEDB_FILTER_SCALE_FLOAT); + double scale = 1.0f; + double offset = 0.0f; + uint64_t byte_width = sizeof(int32_t); + + // Float scale converting tile data from float->int32 + float_scale.set_option(TILEDB_SCALE_FLOAT_BYTEWIDTH, &byte_width); + float_scale.set_option(TILEDB_SCALE_FLOAT_FACTOR, &scale); + float_scale.set_option(TILEDB_SCALE_FLOAT_OFFSET, &offset); + + // Delta filter reinterprets int32->uint32 + tiledb::Filter delta(ctx, TILEDB_FILTER_DELTA); + + // Pass uint32 data to BitWidthReduction filter + tiledb::Filter bit_width_reduction(ctx, TILEDB_FILTER_BIT_WIDTH_REDUCTION); + + tiledb::FilterList filters(ctx); + filters.add_filter(float_scale); + filters.add_filter(delta); + filters.add_filter(bit_width_reduction); + + // Apply filters to both attribute and dimension. + REQUIRE_NOTHROW(d1.set_filter_list(filters)); + domain.add_dimension(d1); + + auto a1 = tiledb::Attribute::create(ctx, "a1"); + REQUIRE_NOTHROW(a1.set_filter_list(filters)); + + tiledb::ArraySchema schema(ctx, TILEDB_SPARSE); + schema.set_domain(domain); + schema.add_attribute(a1); + schema.set_cell_order(TILEDB_ROW_MAJOR); + schema.set_tile_order(TILEDB_ROW_MAJOR); + REQUIRE_NOTHROW(tiledb::Array::create(array_name, schema)); + std::vector d1_data = { + 1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f, 9.0f, 10.0f}; + std::vector a1_data = { + 1.0f, 2.1f, 3.2f, 4.3f, 5.4f, 6.5f, 7.6f, 8.7f, 9.8f, 10.9f}; + + // Write to array. + { + tiledb::Array array(ctx, array_name, TILEDB_WRITE); + tiledb::Query query(ctx, array); + query.set_data_buffer("d1", d1_data); + query.set_data_buffer("a1", a1_data); + query.submit(); + CHECK(tiledb::Query::Status::COMPLETE == query.query_status()); + } + + // Read from array. + { + std::vector d1_read(10); + std::vector a1_read(10); + tiledb::Array array(ctx, array_name, TILEDB_READ); + tiledb::Query query(ctx, array); + query.set_subarray({domain_lo, domain_hi}); + query.set_data_buffer("a1", a1_read); + query.set_data_buffer("d1", d1_read); + query.submit(); + CHECK(tiledb::Query::Status::COMPLETE == query.query_status()); + // Some loss of precision from rounding in FloatScale. + CHECK( + std::vector{ + 1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 7.0f, 8.0f, 9.0f, 10.0f, 11.0f} == + a1_read); + CHECK(d1_data == d1_read); + } + + // Cleanup. + if (vfs.is_dir(array_name)) { + vfs.remove_dir(array_name); + } +} + +TEST_CASE( + "C++ API: Filter pipeline validation", + "[cppapi][filter][pipeline][validation]") { + tiledb::Context ctx; + + tiledb::Domain domain(ctx); + float domain_lo = static_cast(std::numeric_limits::min()); + float domain_hi = static_cast(std::numeric_limits::max() - 1); + auto d1 = tiledb::Dimension::create( + ctx, "d1", {{domain_lo, domain_hi}}, 2048); + auto a1 = tiledb::Attribute::create(ctx, "a1"); + + // FloatScale used for testing different float->integral pipelines. + tiledb::Filter float_scale(ctx, TILEDB_FILTER_SCALE_FLOAT); + double scale = 1.0f; + double offset = 0.0f; + uint64_t byte_width = sizeof(int32_t); + // Float scale converting tile data from float->int32 + float_scale.set_option(TILEDB_SCALE_FLOAT_BYTEWIDTH, &byte_width); + float_scale.set_option(TILEDB_SCALE_FLOAT_FACTOR, &scale); + float_scale.set_option(TILEDB_SCALE_FLOAT_OFFSET, &offset); + + tiledb::FilterList filters(ctx); + SECTION("- FloatScale filter accepts float or double byte width input") { + auto d2 = tiledb::Dimension::create(ctx, "d2", {{1, 100}}, 10); + auto a2 = tiledb::Attribute::create(ctx, "a2"); + filters.add_filter(float_scale); + CHECK_THROWS(d2.set_filter_list(filters)); + CHECK_NOTHROW(a2.set_filter_list(filters)); + } + + SECTION("- Delta filters do not accept real datatypes") { + auto test_filter = GENERATE( + TILEDB_FILTER_POSITIVE_DELTA, + TILEDB_FILTER_DOUBLE_DELTA, + TILEDB_FILTER_DELTA); + tiledb::Filter delta_filter(ctx, test_filter); + filters.add_filter(delta_filter); + // Delta compressors don't accept floats. Should fail without FloatScale. + CHECK_THROWS(d1.set_filter_list(filters)); + CHECK_THROWS(a1.set_filter_list(filters)); + + // Test using FloatScale to convert to integral is accepted. + tiledb::FilterList filters2(ctx); + filters2.add_filter(float_scale); + filters2.add_filter(delta_filter); + CHECK_NOTHROW(d1.set_filter_list(filters2)); + CHECK_NOTHROW(a1.set_filter_list(filters2)); + } + + SECTION("- Webp filter supports only uint8 attributes") { + if (webp_filter_exists) { + tiledb::Filter webp(ctx, TILEDB_FILTER_WEBP); + filters.add_filter(webp); + CHECK_THROWS(d1.set_filter_list(filters)); + CHECK_THROWS(a1.set_filter_list(filters)); + } + } + + SECTION("- Bit width reduction filter supports integral input") { + tiledb::Filter bit_width_reduction(ctx, TILEDB_FILTER_BIT_WIDTH_REDUCTION); + filters.add_filter(bit_width_reduction); + CHECK_THROWS(d1.set_filter_list(filters)); + CHECK_THROWS(a1.set_filter_list(filters)); + + // Test using FloatScale to convert to integral is accepted. + tiledb::FilterList filters2(ctx); + filters2.add_filter(float_scale); + filters2.add_filter(bit_width_reduction); + CHECK_NOTHROW(d1.set_filter_list(filters2)); + CHECK_NOTHROW(a1.set_filter_list(filters2)); + } + + SECTION("- XOR filter interprets datatype as integral") { + // Datatype byte size must match size of int8, int16, int32, or int64 + tiledb::Filter xor_filter(ctx, TILEDB_FILTER_XOR); + filters.add_filter(xor_filter); + CHECK_NOTHROW(d1.set_filter_list(filters)); + CHECK_NOTHROW(a1.set_filter_list(filters)); + } + + SECTION("- Multiple compressors") { + tiledb::Filter bzip(ctx, TILEDB_FILTER_BZIP2); + auto compressor = GENERATE( + TILEDB_FILTER_GZIP, + TILEDB_FILTER_LZ4, + TILEDB_FILTER_RLE, + TILEDB_FILTER_ZSTD); + tiledb::Filter compressor_filter(ctx, compressor); + filters.add_filter(bzip); + filters.add_filter(compressor_filter); + + CHECK_NOTHROW(d1.set_filter_list(filters)); + CHECK_NOTHROW(a1.set_filter_list(filters)); + + // Should throw without FloatScale to convert float->int32. + auto delta_compressor = GENERATE( + TILEDB_FILTER_POSITIVE_DELTA, + TILEDB_FILTER_DOUBLE_DELTA, + TILEDB_FILTER_DELTA); + tiledb::Filter delta_filter(ctx, delta_compressor); + filters.add_filter(delta_filter); + CHECK_THROWS(d1.set_filter_list(filters)); + CHECK_THROWS(a1.set_filter_list(filters)); + } + + SECTION("- Multiple compressors following type conversion") { + auto compressor = GENERATE( + TILEDB_FILTER_DOUBLE_DELTA, + TILEDB_FILTER_DELTA, + TILEDB_FILTER_GZIP, + TILEDB_FILTER_LZ4, + TILEDB_FILTER_RLE, + TILEDB_FILTER_ZSTD); + tiledb::Filter compressor_filter(ctx, compressor); + tiledb::Filter bzip(ctx, TILEDB_FILTER_BZIP2); + filters.add_filter(float_scale); + filters.add_filter(bzip); + filters.add_filter(compressor_filter); + + CHECK_NOTHROW(d1.set_filter_list(filters)); + CHECK_NOTHROW(a1.set_filter_list(filters)); + } +} diff --git a/tiledb/api/c_api/dimension/dimension_api_internal.h b/tiledb/api/c_api/dimension/dimension_api_internal.h index 1dcb6b1f40e..fe96c866e5e 100644 --- a/tiledb/api/c_api/dimension/dimension_api_internal.h +++ b/tiledb/api/c_api/dimension/dimension_api_internal.h @@ -90,7 +90,7 @@ struct tiledb_dimension_handle_t } inline void set_filter_pipeline(const tiledb::sm::FilterPipeline& x) { - throw_if_not_ok(dimension_->set_filter_pipeline(x)); + dimension_->set_filter_pipeline(x); } inline void set_cell_val_num(unsigned int x) { diff --git a/tiledb/sm/array_schema/array_schema.cc b/tiledb/sm/array_schema/array_schema.cc index ff299b72219..f655320117c 100644 --- a/tiledb/sm/array_schema/array_schema.cc +++ b/tiledb/sm/array_schema/array_schema.cc @@ -99,13 +99,17 @@ ArraySchema::ArraySchema(ArrayType array_type) // Set up default filter pipelines for coords, offsets, and validity values. coords_filters_.add_filter(CompressionFilter( - constants::coords_compression, constants::coords_compression_level)); + constants::coords_compression, + constants::coords_compression_level, + Datatype::ANY)); cell_var_offsets_filters_.add_filter(CompressionFilter( constants::cell_var_offsets_compression, - constants::cell_var_offsets_compression_level)); + constants::cell_var_offsets_compression_level, + Datatype::UINT64)); cell_validity_filters_.add_filter(CompressionFilter( constants::cell_validity_compression, - constants::cell_validity_compression_level)); + constants::cell_validity_compression_level, + Datatype::UINT8)); // Generate URI and name for ArraySchema throw_if_not_ok(generate_uri()); @@ -1283,11 +1287,14 @@ ArraySchema ArraySchema::deserialize( // Load filters // Note: Security validation delegated to invoked API - auto coords_filters{FilterPipeline::deserialize(deserializer, version)}; - auto cell_var_filters{FilterPipeline::deserialize(deserializer, version)}; + auto coords_filters{ + FilterPipeline::deserialize(deserializer, version, Datatype::ANY)}; + auto cell_var_filters{ + FilterPipeline::deserialize(deserializer, version, Datatype::UINT64)}; FilterPipeline cell_validity_filters; if (version >= 7) { - cell_validity_filters = FilterPipeline::deserialize(deserializer, version); + cell_validity_filters = + FilterPipeline::deserialize(deserializer, version, Datatype::UINT8); } // Load domain diff --git a/tiledb/sm/array_schema/attribute.cc b/tiledb/sm/array_schema/attribute.cc index 798b2db9253..71463514c65 100644 --- a/tiledb/sm/array_schema/attribute.cc +++ b/tiledb/sm/array_schema/attribute.cc @@ -175,7 +175,8 @@ Attribute Attribute::deserialize( auto cell_val_num = deserializer.read(); // Load filter pipeline - auto filterpipeline{FilterPipeline::deserialize(deserializer, version)}; + auto filterpipeline{ + FilterPipeline::deserialize(deserializer, version, datatype)}; // Load fill value uint64_t fill_value_size = 0; @@ -357,30 +358,7 @@ void Attribute::set_nullable(const bool nullable) { } void Attribute::set_filter_pipeline(const FilterPipeline& pipeline) { - for (unsigned i = 0; i < pipeline.size(); ++i) { - if (datatype_is_real(type_) && - pipeline.get_filter(i)->type() == FilterType::FILTER_DOUBLE_DELTA) - throw AttributeStatusException( - "Cannot set DOUBLE DELTA filter to an attribute with a real " - "datatype"); - } - - if ((type_ == Datatype::STRING_ASCII || type_ == Datatype::STRING_UTF8) && - var_size() && pipeline.size() > 1) { - if (pipeline.has_filter(FilterType::FILTER_RLE) && - pipeline.get_filter(0)->type() != FilterType::FILTER_RLE) { - throw AttributeStatusException( - "RLE filter must be the first filter to apply when used on a " - "variable length string attribute"); - } - if (pipeline.has_filter(FilterType::FILTER_DICTIONARY) && - pipeline.get_filter(0)->type() != FilterType::FILTER_DICTIONARY) { - throw AttributeStatusException( - "Dictionary filter must be the first filter to apply when used on a " - "variable length string attribute"); - } - } - + FilterPipeline::check_filter_types(pipeline, type_, var_size()); filters_ = pipeline; } diff --git a/tiledb/sm/array_schema/dimension.cc b/tiledb/sm/array_schema/dimension.cc index 82b5e2a1e40..2ac6614190f 100644 --- a/tiledb/sm/array_schema/dimension.cc +++ b/tiledb/sm/array_schema/dimension.cc @@ -171,7 +171,8 @@ shared_ptr Dimension::deserialize( cell_val_num = deserializer.read(); // Load filter pipeline - filter_pipeline = FilterPipeline::deserialize(deserializer, version); + filter_pipeline = + FilterPipeline::deserialize(deserializer, version, datatype); } else { datatype = type; cell_val_num = (datatype_is_string(datatype)) ? constants::var_num : 1; @@ -1336,32 +1337,9 @@ Status Dimension::set_domain_unsafe(const void* domain) { return Status::Ok(); } -Status Dimension::set_filter_pipeline(const FilterPipeline& pipeline) { - for (unsigned i = 0; i < pipeline.size(); ++i) { - if (datatype_is_real(type_) && - pipeline.get_filter(i)->type() == FilterType::FILTER_DOUBLE_DELTA) - return LOG_STATUS( - Status_DimensionError("Cannot set DOUBLE DELTA filter to a " - "dimension with a real datatype")); - } - - if (type_ == Datatype::STRING_ASCII && var_size() && pipeline.size() > 1) { - if (pipeline.has_filter(FilterType::FILTER_RLE) && - pipeline.get_filter(0)->type() != FilterType::FILTER_RLE) { - return LOG_STATUS(Status_ArraySchemaError( - "RLE filter must be the first filter to apply when used on a " - "variable length string dimension")); - } - if (pipeline.has_filter(FilterType::FILTER_DICTIONARY) && - pipeline.get_filter(0)->type() != FilterType::FILTER_DICTIONARY) { - return LOG_STATUS(Status_ArraySchemaError( - "Dictionary filter must be the first filter to apply when used on a " - "variable length string dimension")); - } - } - +void Dimension::set_filter_pipeline(const FilterPipeline& pipeline) { + FilterPipeline::check_filter_types(pipeline, type_, var_size()); filters_ = pipeline; - return Status::Ok(); } Status Dimension::set_tile_extent(const void* tile_extent) { diff --git a/tiledb/sm/array_schema/dimension.h b/tiledb/sm/array_schema/dimension.h index fb526266681..f2c333da6ef 100644 --- a/tiledb/sm/array_schema/dimension.h +++ b/tiledb/sm/array_schema/dimension.h @@ -715,7 +715,7 @@ class Dimension { Status set_domain_unsafe(const void* domain); /** Sets the filter pipeline for this dimension. */ - Status set_filter_pipeline(const FilterPipeline& pipeline); + void set_filter_pipeline(const FilterPipeline& pipeline); /** Sets the tile extent. */ Status set_tile_extent(const void* tile_extent); diff --git a/tiledb/sm/compressors/delta_compressor.cc b/tiledb/sm/compressors/delta_compressor.cc index 2904a4463b6..70c3794223e 100644 --- a/tiledb/sm/compressors/delta_compressor.cc +++ b/tiledb/sm/compressors/delta_compressor.cc @@ -229,7 +229,7 @@ void Delta::compress(ConstBuffer* input_buffer, Buffer* output_buffer) { // Write first value throw_if_not_ok(output_buffer->write(&in[0], value_size)); if (num > 1) { - for (uint64_t i = 1; i < num + 1; ++i) { + for (uint64_t i = 1; i < num; ++i) { int64_t cur_delta = int64_t(in[i]) - int64_t(in[i - 1]); throw_if_not_ok(output_buffer->write(&cur_delta, value_size)); } diff --git a/tiledb/sm/compressors/delta_compressor.h b/tiledb/sm/compressors/delta_compressor.h index b6371984880..be07f315196 100644 --- a/tiledb/sm/compressors/delta_compressor.h +++ b/tiledb/sm/compressors/delta_compressor.h @@ -74,13 +74,13 @@ class Delta { * any abs(dd_i). * - *n* (uint64_t) is the number of values in the input buffer. * - * TODO fill this in as needed - * * @param type The type of the input values. * @param input_buffer Input buffer to read from. * @param output_buffer Output buffer to write to the compressed data. * - * @note The function will fail with an error if: TODO + * @note The function will fail with an error if: + * Float or otherwise unsupported datatype is used. + * Failure to write / read from allocated buffers. */ static void compress( diff --git a/tiledb/sm/compressors/test/unit_delta_compressor.cc b/tiledb/sm/compressors/test/unit_delta_compressor.cc index 132ad585cef..66dd6add00d 100644 --- a/tiledb/sm/compressors/test/unit_delta_compressor.cc +++ b/tiledb/sm/compressors/test/unit_delta_compressor.cc @@ -58,8 +58,7 @@ TEST_CASE("Test delta compression of a vector", "[compression][delta]") { compressed.assign( reinterpret_cast((char*)compressed_buff.data() + 8), reinterpret_cast(compressed_buff.data()) + - (compressed_buff.size() / sizeof(decltype(compressed)::value_type) - - 1)); + (compressed_buff.size() / sizeof(decltype(compressed)::value_type))); CHECK(compressed == expected); } diff --git a/tiledb/sm/filter/bit_width_reduction_filter.cc b/tiledb/sm/filter/bit_width_reduction_filter.cc index 08c14ab6820..df74bc031df 100644 --- a/tiledb/sm/filter/bit_width_reduction_filter.cc +++ b/tiledb/sm/filter/bit_width_reduction_filter.cc @@ -81,13 +81,14 @@ static inline uint8_t bits_required(T value) { return bits_required(value, std::is_signed()); } -BitWidthReductionFilter::BitWidthReductionFilter() - : Filter(FilterType::FILTER_BIT_WIDTH_REDUCTION) { +BitWidthReductionFilter::BitWidthReductionFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_BIT_WIDTH_REDUCTION, filter_data_type) { max_window_size_ = 256; } -BitWidthReductionFilter::BitWidthReductionFilter(uint32_t max_window_size) - : Filter(FilterType::FILTER_BIT_WIDTH_REDUCTION) +BitWidthReductionFilter::BitWidthReductionFilter( + uint32_t max_window_size, Datatype filter_data_type) + : Filter(FilterType::FILTER_BIT_WIDTH_REDUCTION, filter_data_type) , max_window_size_(max_window_size) { } @@ -97,6 +98,14 @@ void BitWidthReductionFilter::dump(FILE* out) const { fprintf(out, "BitWidthReduction: BIT_WIDTH_MAX_WINDOW=%u", max_window_size_); } +bool BitWidthReductionFilter::accepts_input_datatype(Datatype datatype) const { + if (datatype_is_integer(datatype) || datatype_is_datetime(datatype) || + datatype_is_time(datatype) || datatype_is_string(datatype)) { + return true; + } + return false; +} + Status BitWidthReductionFilter::run_forward( const WriterTile& tile, WriterTile* const offsets_tile, @@ -104,12 +113,12 @@ Status BitWidthReductionFilter::run_forward( FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + auto data_type_size = static_cast(datatype_size(filter_data_type_)); // If bit width compression can't work, just return the input unmodified. - if ((!datatype_is_integer(tile_type) && tile_type != Datatype::BLOB) || - tile_type_size == 1) { + if ((!datatype_is_integer(filter_data_type_) && + filter_data_type_ != Datatype::BLOB) || + data_type_size == 1) { RETURN_NOT_OK(output->append_view(input)); RETURN_NOT_OK(output_metadata->append_view(input_metadata)); return Status::Ok(); @@ -117,8 +126,8 @@ Status BitWidthReductionFilter::run_forward( /* Note: Arithmetic operations cannot be performed on std::byte. We will use uint8_t for the Datatype::BLOB case as it is the same size as - std::byte and can have arithmetic perfomed on it. */ - switch (tile_type) { + std::byte and can have arithmetic performed on it. */ + switch (filter_data_type_) { case Datatype::INT8: return run_forward( tile, offsets_tile, input_metadata, input, output_metadata, output); @@ -288,13 +297,12 @@ Status BitWidthReductionFilter::run_reverse( FilterBuffer* output, const Config& config) const { (void)config; - - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + auto data_type_size = static_cast(datatype_size(filter_data_type_)); // If bit width compression wasn't applied, just return the input unmodified. - if ((!datatype_is_integer(tile_type) && tile_type != Datatype::BLOB) || - tile_type_size == 1) { + if ((!datatype_is_integer(filter_data_type_) && + filter_data_type_ != Datatype::BLOB) || + data_type_size == 1) { RETURN_NOT_OK(output->append_view(input)); RETURN_NOT_OK(output_metadata->append_view(input_metadata)); return Status::Ok(); @@ -303,7 +311,7 @@ Status BitWidthReductionFilter::run_reverse( /* Note: Arithmetic operations cannot be performed on std::byte. We will use uint8_t for the Datatype::BLOB case as it is the same size as std::byte and can have arithmetic perfomed on it. */ - switch (tile_type) { + switch (filter_data_type_) { case Datatype::INT8: return run_reverse( tile, offsets_tile, input_metadata, input, output_metadata, output); @@ -362,14 +370,13 @@ Status BitWidthReductionFilter::run_reverse( template Status BitWidthReductionFilter::run_reverse( - const Tile& tile, + const Tile&, Tile* const, FilterBuffer* input_metadata, FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = datatype_size(tile_type); + auto data_type_size = datatype_size(filter_data_type_); uint32_t num_windows, orig_length; RETURN_NOT_OK(input_metadata->read(&orig_length, sizeof(uint32_t))); @@ -384,7 +391,7 @@ Status BitWidthReductionFilter::run_reverse( T window_value_offset; uint8_t orig_bits = sizeof(T) * 8, compressed_bits; // Read window header - RETURN_NOT_OK(input_metadata->read(&window_value_offset, tile_type_size)); + RETURN_NOT_OK(input_metadata->read(&window_value_offset, data_type_size)); RETURN_NOT_OK(input_metadata->read(&compressed_bits, sizeof(uint8_t))); RETURN_NOT_OK(input_metadata->read(&window_nbytes, sizeof(uint32_t))); @@ -400,7 +407,7 @@ Status BitWidthReductionFilter::run_reverse( RETURN_NOT_OK( read_compressed_value(input, compressed_bits, &input_value)); input_value += window_value_offset; - RETURN_NOT_OK(output->write(&input_value, tile_type_size)); + RETURN_NOT_OK(output->write(&input_value, data_type_size)); } } } @@ -569,7 +576,7 @@ void BitWidthReductionFilter::set_max_window_size(uint32_t max_window_size) { } BitWidthReductionFilter* BitWidthReductionFilter::clone_impl() const { - auto clone = new BitWidthReductionFilter; + auto clone = tdb_new(BitWidthReductionFilter, filter_data_type_); clone->max_window_size_ = max_window_size_; return clone; } diff --git a/tiledb/sm/filter/bit_width_reduction_filter.h b/tiledb/sm/filter/bit_width_reduction_filter.h index 54ab294f935..ebcab1ec93d 100644 --- a/tiledb/sm/filter/bit_width_reduction_filter.h +++ b/tiledb/sm/filter/bit_width_reduction_filter.h @@ -42,8 +42,8 @@ namespace tiledb { namespace sm { /** - * A filter that compresses an array of unsigned integers by reducing the number - * of bits per element if possible. + * A filter that compresses an array of integers by reducing the number of bits + * per element if possible. * * When compressing, the filter determines the min and max values of the input * elements within a window of size N. If the range of values can be represented @@ -82,18 +82,31 @@ namespace sm { */ class BitWidthReductionFilter : public Filter { public: - /** Constructor. */ - BitWidthReductionFilter(); + /** + * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. + */ + BitWidthReductionFilter(Datatype filter_data_type); - /** Constructor. + /** + * Constructor. * - * @param max_window_size + * @param max_window_size Window size in bytes to apply bit width reduction. + * @param filter_data_type Datatype the filter will operate on. */ - BitWidthReductionFilter(uint32_t max_window_size); + BitWidthReductionFilter(uint32_t max_window_size, Datatype filter_data_type); /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; + /** + * Checks if the filter is applicable to the input datatype. + * + * @param datatype Input datatype to check filter compatibility. + */ + bool accepts_input_datatype(Datatype datatype) const override; + /** Return the max window size used by the filter. */ uint32_t max_window_size() const; diff --git a/tiledb/sm/filter/bitshuffle_filter.cc b/tiledb/sm/filter/bitshuffle_filter.cc index 9487f457439..65cb521994b 100644 --- a/tiledb/sm/filter/bitshuffle_filter.cc +++ b/tiledb/sm/filter/bitshuffle_filter.cc @@ -43,12 +43,12 @@ using namespace tiledb::common; namespace tiledb { namespace sm { -BitshuffleFilter::BitshuffleFilter() - : Filter(FilterType::FILTER_BITSHUFFLE) { +BitshuffleFilter::BitshuffleFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_BITSHUFFLE, filter_data_type) { } BitshuffleFilter* BitshuffleFilter::clone_impl() const { - return new BitshuffleFilter; + return tdb_new(BitshuffleFilter, filter_data_type_); } void BitshuffleFilter::dump(FILE* out) const { @@ -65,8 +65,7 @@ Status BitshuffleFilter::run_forward( FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); // Output size does not change with this filter. RETURN_NOT_OK(output->prepend_buffer(input->size())); @@ -126,9 +125,8 @@ Status BitshuffleFilter::compute_parts( } Status BitshuffleFilter::shuffle_part( - const WriterTile& tile, const ConstBuffer* part, Buffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + const WriterTile&, const ConstBuffer* part, Buffer* output) const { + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); auto part_nelts = part->size() / tile_type_size; auto bytes_processed = bshuf_bitshuffle( part->data(), output->cur_data(), part_nelts, tile_type_size, 0); @@ -172,9 +170,7 @@ Status BitshuffleFilter::run_reverse( FilterBuffer* output, const Config& config) const { (void)config; - - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); // Get number of parts uint32_t num_parts; @@ -213,9 +209,8 @@ Status BitshuffleFilter::run_reverse( } Status BitshuffleFilter::unshuffle_part( - const Tile& tile, const ConstBuffer* part, Buffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + const Tile&, const ConstBuffer* part, Buffer* output) const { + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); auto part_nelts = part->size() / tile_type_size; auto bytes_processed = bshuf_bitunshuffle( part->data(), output->cur_data(), part_nelts, tile_type_size, 0); diff --git a/tiledb/sm/filter/bitshuffle_filter.h b/tiledb/sm/filter/bitshuffle_filter.h index f38239df3fe..c1f9ca354cc 100644 --- a/tiledb/sm/filter/bitshuffle_filter.h +++ b/tiledb/sm/filter/bitshuffle_filter.h @@ -76,8 +76,10 @@ class BitshuffleFilter : public Filter { public: /** * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. */ - BitshuffleFilter(); + BitshuffleFilter(Datatype filter_data_type); /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; diff --git a/tiledb/sm/filter/byteshuffle_filter.cc b/tiledb/sm/filter/byteshuffle_filter.cc index ab458692f87..d6d3012484a 100644 --- a/tiledb/sm/filter/byteshuffle_filter.cc +++ b/tiledb/sm/filter/byteshuffle_filter.cc @@ -43,12 +43,12 @@ using namespace tiledb::common; namespace tiledb { namespace sm { -ByteshuffleFilter::ByteshuffleFilter() - : Filter(FilterType::FILTER_BYTESHUFFLE) { +ByteshuffleFilter::ByteshuffleFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_BYTESHUFFLE, filter_data_type) { } ByteshuffleFilter* ByteshuffleFilter::clone_impl() const { - return new ByteshuffleFilter; + return tdb_new(ByteshuffleFilter, filter_data_type_); } void ByteshuffleFilter::dump(FILE* out) const { @@ -94,9 +94,8 @@ Status ByteshuffleFilter::run_forward( } Status ByteshuffleFilter::shuffle_part( - const WriterTile& tile, const ConstBuffer* part, Buffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + const WriterTile&, const ConstBuffer* part, Buffer* output) const { + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); blosc::shuffle( tile_type_size, @@ -149,9 +148,8 @@ Status ByteshuffleFilter::run_reverse( } Status ByteshuffleFilter::unshuffle_part( - const Tile& tile, const ConstBuffer* part, Buffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + const Tile&, const ConstBuffer* part, Buffer* output) const { + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); blosc::unshuffle( tile_type_size, diff --git a/tiledb/sm/filter/byteshuffle_filter.h b/tiledb/sm/filter/byteshuffle_filter.h index 112070ccb00..ef25bf97214 100644 --- a/tiledb/sm/filter/byteshuffle_filter.h +++ b/tiledb/sm/filter/byteshuffle_filter.h @@ -68,8 +68,10 @@ class ByteshuffleFilter : public Filter { public: /** * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. */ - ByteshuffleFilter(); + ByteshuffleFilter(Datatype filter_data_type); /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; diff --git a/tiledb/sm/filter/checksum_md5_filter.cc b/tiledb/sm/filter/checksum_md5_filter.cc index f4175c6a9f8..9fa157c0af2 100644 --- a/tiledb/sm/filter/checksum_md5_filter.cc +++ b/tiledb/sm/filter/checksum_md5_filter.cc @@ -45,12 +45,12 @@ using namespace tiledb::common; namespace tiledb { namespace sm { -ChecksumMD5Filter::ChecksumMD5Filter() - : Filter(FilterType::FILTER_CHECKSUM_MD5) { +ChecksumMD5Filter::ChecksumMD5Filter(Datatype filter_data_type) + : Filter(FilterType::FILTER_CHECKSUM_MD5, filter_data_type) { } ChecksumMD5Filter* ChecksumMD5Filter::clone_impl() const { - return tdb_new(ChecksumMD5Filter); + return tdb_new(ChecksumMD5Filter, filter_data_type_); } void ChecksumMD5Filter::dump(FILE* out) const { diff --git a/tiledb/sm/filter/checksum_md5_filter.h b/tiledb/sm/filter/checksum_md5_filter.h index e30a2566250..54462895511 100644 --- a/tiledb/sm/filter/checksum_md5_filter.h +++ b/tiledb/sm/filter/checksum_md5_filter.h @@ -43,12 +43,11 @@ namespace sm { /** * A filter that computes a checksum of the input data into the output data - buffer with - * user specified algorithm. + * buffer with user specified algorithm. * * If the input comes in multiple FilterBuffer parts, each part is checksummed * independently in the forward direction. Input metadata is checksummed as - well. + * well. * * The forward output metadata has the format: * uint32_t - number of metadata checksums @@ -74,8 +73,10 @@ class ChecksumMD5Filter : public Filter { public: /** * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. */ - ChecksumMD5Filter(); + ChecksumMD5Filter(Datatype filter_data_type); /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; diff --git a/tiledb/sm/filter/checksum_sha256_filter.cc b/tiledb/sm/filter/checksum_sha256_filter.cc index 71e6685dd8e..2a6fbdf89c1 100644 --- a/tiledb/sm/filter/checksum_sha256_filter.cc +++ b/tiledb/sm/filter/checksum_sha256_filter.cc @@ -45,12 +45,12 @@ using namespace tiledb::common; namespace tiledb { namespace sm { -ChecksumSHA256Filter::ChecksumSHA256Filter() - : Filter(FilterType::FILTER_CHECKSUM_SHA256) { +ChecksumSHA256Filter::ChecksumSHA256Filter(Datatype filter_data_type) + : Filter(FilterType::FILTER_CHECKSUM_SHA256, filter_data_type) { } ChecksumSHA256Filter* ChecksumSHA256Filter::clone_impl() const { - return tdb_new(ChecksumSHA256Filter); + return tdb_new(ChecksumSHA256Filter, filter_data_type_); } void ChecksumSHA256Filter::dump(FILE* out) const { diff --git a/tiledb/sm/filter/checksum_sha256_filter.h b/tiledb/sm/filter/checksum_sha256_filter.h index 4a3f4233ef6..fdc79cb1547 100644 --- a/tiledb/sm/filter/checksum_sha256_filter.h +++ b/tiledb/sm/filter/checksum_sha256_filter.h @@ -74,8 +74,10 @@ class ChecksumSHA256Filter : public Filter { public: /** * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. */ - ChecksumSHA256Filter(); + ChecksumSHA256Filter(Datatype filter_data_type); /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; diff --git a/tiledb/sm/filter/compression_filter.cc b/tiledb/sm/filter/compression_filter.cc index e96c086afda..c3164c8b8be 100644 --- a/tiledb/sm/filter/compression_filter.cc +++ b/tiledb/sm/filter/compression_filter.cc @@ -54,13 +54,20 @@ using namespace tiledb::common; namespace tiledb { namespace sm { +class CompressionFilterStatusException : public StatusException { + public: + explicit CompressionFilterStatusException(const std::string& msg) + : StatusException("CompressionFilter", msg) { + } +}; CompressionFilter::CompressionFilter( FilterType compressor, int level, + Datatype filter_data_type, Datatype reinterpret_type, const format_version_t version) - : Filter(compressor) + : Filter(compressor, filter_data_type) , compressor_(filter_to_compressor(compressor)) , level_(level) , version_(version) @@ -72,9 +79,10 @@ CompressionFilter::CompressionFilter( CompressionFilter::CompressionFilter( Compressor compressor, int level, + Datatype filter_data_type, Datatype reinterpret_type, const format_version_t version) - : Filter(compressor_to_filter(compressor)) + : Filter(compressor_to_filter(compressor), filter_data_type) , compressor_(compressor) , level_(level) , version_(version) @@ -87,6 +95,21 @@ Compressor CompressionFilter::compressor() const { return compressor_; } +bool CompressionFilter::accepts_input_datatype(Datatype input_type) const { + auto this_filter_type = compressor_to_filter(compressor_); + if (this_filter_type == FilterType::FILTER_DOUBLE_DELTA || + this_filter_type == FilterType::FILTER_DELTA) { + // Delta filters do not accept floating point types. + if (datatype_is_real( + reinterpret_datatype_ != Datatype::ANY ? reinterpret_datatype_ : + input_type)) { + return false; + } + } + + return true; +} + int CompressionFilter::compression_level() const { return level_; } @@ -96,7 +119,8 @@ void CompressionFilter::dump(FILE* out) const { out = stdout; std::string compressor_str = tiledb::sm::compressor_str(compressor_); - if (compressor_ == Compressor::DELTA) { + if (compressor_ == Compressor::DELTA || + compressor_ == Compressor::DOUBLE_DELTA) { fprintf( out, "%s: COMPRESSION_LEVEL=%i, REINTERPRET_DATATYPE=%s", @@ -110,7 +134,12 @@ void CompressionFilter::dump(FILE* out) const { CompressionFilter* CompressionFilter::clone_impl() const { return tdb_new( - CompressionFilter, compressor_, level_, reinterpret_datatype_, version_); + CompressionFilter, + compressor_, + level_, + filter_data_type_, + reinterpret_datatype_, + version_); } void CompressionFilter::set_compressor(Compressor compressor) { @@ -230,8 +259,8 @@ Status CompressionFilter::run_forward( return LOG_STATUS( Status_FilterError("Input is too large to be compressed.")); - if ((tile.type() == Datatype::STRING_ASCII || - tile.type() == Datatype::STRING_UTF8) && + if ((filter_data_type_ == Datatype::STRING_ASCII || + filter_data_type_ == Datatype::STRING_UTF8) && offsets_tile) { if (compressor_ == Compressor::RLE || compressor_ == Compressor::DICTIONARY_ENCODING) @@ -304,8 +333,8 @@ Status CompressionFilter::run_reverse( Buffer* metadata_buffer = output_metadata->buffer_ptr(0); assert(metadata_buffer != nullptr); - if ((tile.type() == Datatype::STRING_ASCII || - tile.type() == Datatype::STRING_UTF8) && + if ((filter_data_type_ == Datatype::STRING_ASCII || + filter_data_type_ == Datatype::STRING_UTF8) && version_ >= 12 && offsets_tile) { if (compressor_ == Compressor::RLE || compressor_ == Compressor::DICTIONARY_ENCODING) @@ -331,7 +360,6 @@ Status CompressionFilter::compress_part( ConstBuffer input_buffer(part->data(), part->size()); auto cell_size = tile.cell_size(); - auto type = tile.type(); // Invoke the proper compressor uint32_t orig_size = (uint32_t)output->size(); @@ -353,7 +381,11 @@ Status CompressionFilter::compress_part( RETURN_NOT_OK(BZip::compress(level_, &input_buffer, output)); break; case Compressor::DOUBLE_DELTA: - RETURN_NOT_OK(DoubleDelta::compress(type, &input_buffer, output)); + RETURN_NOT_OK(DoubleDelta::compress( + reinterpret_datatype_ == Datatype::ANY ? filter_data_type_ : + reinterpret_datatype_, + &input_buffer, + output)); break; case Compressor::DICTIONARY_ENCODING: return LOG_STATUS( @@ -362,7 +394,8 @@ Status CompressionFilter::compress_part( case Compressor::DELTA: // Use schema type if REINTERPRET_TYPE option is not set. Delta::compress( - reinterpret_datatype_ == Datatype::ANY ? type : reinterpret_datatype_, + reinterpret_datatype_ == Datatype::ANY ? filter_data_type_ : + reinterpret_datatype_, &input_buffer, output); break; @@ -389,7 +422,6 @@ Status CompressionFilter::decompress_part( Buffer* output, FilterBuffer* input_metadata) const { auto cell_size = tile.cell_size(); - auto type = tile.type(); // Read the part metadata uint32_t compressed_size, uncompressed_size; @@ -433,12 +465,17 @@ Status CompressionFilter::decompress_part( break; case Compressor::DELTA: Delta::decompress( - reinterpret_datatype_ == Datatype::ANY ? type : reinterpret_datatype_, + reinterpret_datatype_ == Datatype::ANY ? filter_data_type_ : + reinterpret_datatype_, &input_buffer, &output_buffer); break; case Compressor::DOUBLE_DELTA: - st = DoubleDelta::decompress(type, &input_buffer, &output_buffer); + st = DoubleDelta::decompress( + reinterpret_datatype_ == Datatype::ANY ? filter_data_type_ : + reinterpret_datatype_, + &input_buffer, + &output_buffer); break; case Compressor::DICTIONARY_ENCODING: return LOG_STATUS( @@ -496,7 +533,7 @@ Status CompressionFilter::compress_var_string_coords( FilterBuffer& output, FilterBuffer& output_metadata) const { if (input.num_buffers() != 1) { - throw std::logic_error( + throw CompressionFilterStatusException( "Var-sized string input has to be in single " "buffer format to be compressed with RLE or Dictionary encoding"); } @@ -583,7 +620,7 @@ Status CompressionFilter::decompress_var_string_coords( Tile* offsets_tile, FilterBuffer& output) const { if (input.num_buffers() != 1) { - throw std::logic_error( + throw CompressionFilterStatusException( "Var-sized string input has to be in single " "buffer format to be decompressed with RLE or Dictionary encoding"); } @@ -669,7 +706,8 @@ void CompressionFilter::serialize_impl(Serializer& serializer) const { auto compressor_char = static_cast(compressor_); serializer.write(compressor_char); serializer.write(level_); - if (compressor_ == Compressor::DELTA) { + if (compressor_ == Compressor::DELTA || + compressor_ == Compressor::DOUBLE_DELTA) { serializer.write(static_cast(reinterpret_datatype_)); } } @@ -692,5 +730,16 @@ void CompressionFilter::init_decompression_resource_pool(uint64_t size) { } } +Datatype CompressionFilter::output_datatype(Datatype datatype) const { + switch (compressor_) { + case Compressor::DOUBLE_DELTA: + case Compressor::DELTA: + return reinterpret_datatype_ == Datatype::ANY ? datatype : + reinterpret_datatype_; + default: + return datatype; + } +} + } // namespace sm } // namespace tiledb diff --git a/tiledb/sm/filter/compression_filter.h b/tiledb/sm/filter/compression_filter.h index d9100aac1b9..dcfe1943da9 100644 --- a/tiledb/sm/filter/compression_filter.h +++ b/tiledb/sm/filter/compression_filter.h @@ -84,12 +84,14 @@ class CompressionFilter : public Filter { * * @param compressor Compressor to use * @param level Compression level to use + * @param filter_data_type Datatype the compressor will operate on. * @param reinterpret_type Type to reinterpret data prior to compression. * @param version Format version */ CompressionFilter( FilterType compressor, int level, + Datatype filter_data_type, Datatype reinterpret_type = Datatype::ANY, const format_version_t version = constants::format_version); @@ -98,18 +100,23 @@ class CompressionFilter : public Filter { * * @param compressor Compressor to use * @param level Compression level to use + * @param filter_data_type Datatype the compressor will operate on. * @param reinterpret_type Type to reinterpret data prior to compression. * @param version Format version */ CompressionFilter( Compressor compressor, int level, + Datatype filter_data_type, Datatype reinterpret_type = Datatype::ANY, const format_version_t version = constants::format_version); /** Return the compressor used by this filter instance. */ Compressor compressor() const; + /** Return whether the compression filter accepts given Datatype */ + bool accepts_input_datatype(Datatype type) const override; + /** Return the compression level used by this filter instance. */ int compression_level() const; @@ -246,8 +253,10 @@ class CompressionFilter : public Filter { /** Initializes the decompression resource pool */ void init_decompression_resource_pool(uint64_t size) override; - /** Creates a vector of views of the input strings and returns the max string - * size */ + /** + * Creates a vector of views of the input strings and returns the max string + * size + */ static tuple, uint64_t> create_input_view( const FilterBuffer& input, WriterTile* const offsets_tile); @@ -258,6 +267,15 @@ class CompressionFilter : public Filter { * @return Number of bytes required to store the input number */ static uint8_t compute_bytesize(uint64_t param_length); + + /** + * Returns the filter output type + * + * @param input_type Expected type used for input. Used for filters which + * change output type based on input data. e.g. XORFilter output type is + * based on byte width of input type. + */ + Datatype output_datatype(Datatype input_type) const override; }; } // namespace sm diff --git a/tiledb/sm/filter/encryption_aes256gcm_filter.cc b/tiledb/sm/filter/encryption_aes256gcm_filter.cc index bd002784ef0..dba2b9d19ce 100644 --- a/tiledb/sm/filter/encryption_aes256gcm_filter.cc +++ b/tiledb/sm/filter/encryption_aes256gcm_filter.cc @@ -44,20 +44,20 @@ using namespace tiledb::common; namespace tiledb { namespace sm { -EncryptionAES256GCMFilter::EncryptionAES256GCMFilter() - : Filter(FilterType::INTERNAL_FILTER_AES_256_GCM) { +EncryptionAES256GCMFilter::EncryptionAES256GCMFilter(Datatype filter_data_type) + : Filter(FilterType::INTERNAL_FILTER_AES_256_GCM, filter_data_type) { set_key(nullptr); } EncryptionAES256GCMFilter::EncryptionAES256GCMFilter( - const EncryptionKey& encryption_key) - : Filter(FilterType::INTERNAL_FILTER_AES_256_GCM) { + const EncryptionKey& encryption_key, Datatype filter_data_type) + : Filter(FilterType::INTERNAL_FILTER_AES_256_GCM, filter_data_type) { auto buff = encryption_key.key(); set_key(buff.data()); } EncryptionAES256GCMFilter* EncryptionAES256GCMFilter::clone_impl() const { - auto clone = new EncryptionAES256GCMFilter; + auto clone = tdb_new(EncryptionAES256GCMFilter, filter_data_type_); // Copy key bytes buffer. clone->key_bytes_ = key_bytes_; return clone; diff --git a/tiledb/sm/filter/encryption_aes256gcm_filter.h b/tiledb/sm/filter/encryption_aes256gcm_filter.h index 06711681fd2..31c8bdb252f 100644 --- a/tiledb/sm/filter/encryption_aes256gcm_filter.h +++ b/tiledb/sm/filter/encryption_aes256gcm_filter.h @@ -80,15 +80,19 @@ class EncryptionAES256GCMFilter : public Filter { public: /** * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. */ - EncryptionAES256GCMFilter(); + EncryptionAES256GCMFilter(Datatype filter_data_type); /** * Constructor with explicit key. * * @param key Key to use for the filter. + * @param filter_data_type Datatype the filter will operate on. */ - explicit EncryptionAES256GCMFilter(const EncryptionKey& key); + explicit EncryptionAES256GCMFilter( + const EncryptionKey& key, Datatype filter_data_type); /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; diff --git a/tiledb/sm/filter/filter.cc b/tiledb/sm/filter/filter.cc index 6e5cf0b179e..4c691a32dfc 100644 --- a/tiledb/sm/filter/filter.cc +++ b/tiledb/sm/filter/filter.cc @@ -34,22 +34,52 @@ #include "tiledb/common/common.h" #include "tiledb/common/logger_public.h" #include "tiledb/sm/buffer/buffer.h" +#include "tiledb/sm/enums/datatype.h" +#include "tiledb/sm/enums/filter_type.h" using namespace tiledb::common; namespace tiledb { namespace sm { - -Filter::Filter(FilterType type) { +class FilterStatusException : public StatusException { + public: + explicit FilterStatusException(const std::string& msg) + : StatusException("Filter", msg) { + } +}; + +Filter::Filter(FilterType type, Datatype filter_data_type) { type_ = type; + filter_data_type_ = filter_data_type; } Filter* Filter::clone() const { // Call subclass-specific clone function auto clone = clone_impl(); + clone->filter_data_type_ = filter_data_type_; return clone; } +Datatype Filter::output_datatype(Datatype datatype) const { + return datatype; +} + +void Filter::ensure_accepts_datatype(Datatype datatype) const { + if (this->type() == FilterType::FILTER_NONE) { + return; + } + + if (!this->accepts_input_datatype(datatype)) { + throw FilterStatusException( + "Filter " + filter_type_str(this->type()) + + " does not accept input type " + datatype_str(datatype)); + } +} + +bool Filter::accepts_input_datatype(Datatype) const { + return true; +}; + Status Filter::get_option(FilterOption option, void* value) const { if (value == nullptr) return LOG_STATUS( diff --git a/tiledb/sm/filter/filter.h b/tiledb/sm/filter/filter.h index 243219ec260..af22aad9825 100644 --- a/tiledb/sm/filter/filter.h +++ b/tiledb/sm/filter/filter.h @@ -51,6 +51,7 @@ class WriterTile; enum class FilterOption : uint8_t; enum class FilterType : uint8_t; +enum class Datatype : uint8_t; /** * A Filter processes or modifies a byte region, modifying it in place, or @@ -61,8 +62,12 @@ enum class FilterType : uint8_t; */ class Filter { public: - /** Constructor. */ - explicit Filter(FilterType type); + /** + * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. + */ + explicit Filter(FilterType type, Datatype filter_data_type); /** Destructor. */ virtual ~Filter() = default; @@ -76,6 +81,29 @@ class Filter { /** Dumps the filter details in ASCII format in the selected output. */ virtual void dump(FILE* out) const = 0; + /** + * Returns the filter output type + * + * @param input_type Expected type used for input. Used for filters which + * change output type based on input data. e.g. XORFilter output type is + * based on byte width of input type. + */ + virtual Datatype output_datatype(Datatype input_type) const; + + /** + * Throws if given data type *cannot* be handled by this filter. + * + * @param datatype Input datatype to check filter compatibility. + */ + void ensure_accepts_datatype(Datatype datatype) const; + + /** + * Checks if the filter is applicable to the input datatype. + * + * @param type Input datatype to check filter compatibility. + */ + virtual bool accepts_input_datatype(Datatype type) const; + /** * Gets an option from this filter. * @@ -179,6 +207,9 @@ class Filter { /** The filter type. */ FilterType type_; + /** The datatype this filter will operate on within the pipeline. */ + Datatype filter_data_type_; + /** * Clone function must implemented by each specific Filter subclass. This is * used instead of copy constructors to allow for base-typed Filter instances diff --git a/tiledb/sm/filter/filter_create.cc b/tiledb/sm/filter/filter_create.cc index e5e3011d7b1..7d424c0c0c1 100644 --- a/tiledb/sm/filter/filter_create.cc +++ b/tiledb/sm/filter/filter_create.cc @@ -55,7 +55,7 @@ tiledb::sm::Filter* tiledb::sm::FilterCreate::make(FilterType type) { switch (type) { case tiledb::sm::FilterType::FILTER_NONE: - return tdb_new(tiledb::sm::NoopFilter); + return tdb_new(tiledb::sm::NoopFilter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_GZIP: case tiledb::sm::FilterType::FILTER_ZSTD: case tiledb::sm::FilterType::FILTER_LZ4: @@ -64,28 +64,28 @@ tiledb::sm::Filter* tiledb::sm::FilterCreate::make(FilterType type) { case tiledb::sm::FilterType::FILTER_DELTA: case tiledb::sm::FilterType::FILTER_DOUBLE_DELTA: case tiledb::sm::FilterType::FILTER_DICTIONARY: - return tdb_new(tiledb::sm::CompressionFilter, type, -1); + return tdb_new(tiledb::sm::CompressionFilter, type, -1, Datatype::ANY); case tiledb::sm::FilterType::FILTER_BIT_WIDTH_REDUCTION: - return tdb_new(tiledb::sm::BitWidthReductionFilter); + return tdb_new(tiledb::sm::BitWidthReductionFilter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_BITSHUFFLE: - return tdb_new(tiledb::sm::BitshuffleFilter); + return tdb_new(tiledb::sm::BitshuffleFilter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_BYTESHUFFLE: - return tdb_new(tiledb::sm::ByteshuffleFilter); + return tdb_new(tiledb::sm::ByteshuffleFilter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_POSITIVE_DELTA: - return tdb_new(tiledb::sm::PositiveDeltaFilter); + return tdb_new(tiledb::sm::PositiveDeltaFilter, Datatype::ANY); case tiledb::sm::FilterType::INTERNAL_FILTER_AES_256_GCM: - return tdb_new(tiledb::sm::EncryptionAES256GCMFilter); + return tdb_new(tiledb::sm::EncryptionAES256GCMFilter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_CHECKSUM_MD5: - return tdb_new(tiledb::sm::ChecksumMD5Filter); + return tdb_new(tiledb::sm::ChecksumMD5Filter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_CHECKSUM_SHA256: - return tdb_new(tiledb::sm::ChecksumSHA256Filter); + return tdb_new(tiledb::sm::ChecksumSHA256Filter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_SCALE_FLOAT: - return tdb_new(tiledb::sm::FloatScalingFilter); + return tdb_new(tiledb::sm::FloatScalingFilter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_XOR: - return tdb_new(tiledb::sm::XORFilter); + return tdb_new(tiledb::sm::XORFilter, Datatype::ANY); case tiledb::sm::FilterType::FILTER_WEBP: { if constexpr (webp_filter_exists) { - return tdb_new(tiledb::sm::WebpFilter); + return tdb_new(tiledb::sm::WebpFilter, Datatype::ANY); } else { throw WebpNotPresentError(); } @@ -100,7 +100,8 @@ tiledb::sm::Filter* tiledb::sm::FilterCreate::make(FilterType type) { shared_ptr tiledb::sm::FilterCreate::deserialize( Deserializer& deserializer, const EncryptionKey& encryption_key, - const uint32_t version) { + const uint32_t version, + Datatype datatype) { Status st; uint8_t type = deserializer.read(); FilterType filtertype = static_cast(type); @@ -113,7 +114,7 @@ shared_ptr tiledb::sm::FilterCreate::deserialize( switch (filtertype) { case FilterType::FILTER_NONE: - return make_shared(HERE()); + return make_shared(HERE(), datatype); case FilterType::FILTER_GZIP: case FilterType::FILTER_ZSTD: case FilterType::FILTER_LZ4: @@ -125,37 +126,46 @@ shared_ptr tiledb::sm::FilterCreate::deserialize( uint8_t compressor_char = deserializer.read(); int compression_level = deserializer.read(); Datatype reinterpret_type = Datatype::ANY; - if (filtertype == FilterType::FILTER_DELTA) { + if (version >= 20 && (filtertype == FilterType::FILTER_DELTA || + filtertype == FilterType::FILTER_DOUBLE_DELTA)) { uint8_t reinterpret = deserializer.read(); reinterpret_type = static_cast(reinterpret); } Compressor compressor = static_cast(compressor_char); return make_shared( - HERE(), compressor, compression_level, reinterpret_type, version); + HERE(), + compressor, + compression_level, + datatype, + reinterpret_type, + version); } case FilterType::FILTER_BIT_WIDTH_REDUCTION: { uint32_t max_window_size = deserializer.read(); - return make_shared(HERE(), max_window_size); + return make_shared( + HERE(), max_window_size, datatype); } case FilterType::FILTER_BITSHUFFLE: - return make_shared(HERE()); + return make_shared(HERE(), datatype); case FilterType::FILTER_BYTESHUFFLE: - return make_shared(HERE()); + return make_shared(HERE(), datatype); case FilterType::FILTER_POSITIVE_DELTA: { uint32_t max_window_size = deserializer.read(); - return make_shared(HERE(), max_window_size); + return make_shared( + HERE(), max_window_size, datatype); } case FilterType::INTERNAL_FILTER_AES_256_GCM: if (encryption_key.encryption_type() == tiledb::sm::EncryptionType::AES_256_GCM) { - return make_shared(HERE(), encryption_key); + return make_shared( + HERE(), encryption_key, datatype); } else { - return make_shared(HERE()); + return make_shared(HERE(), datatype); } case FilterType::FILTER_CHECKSUM_MD5: - return make_shared(HERE()); + return make_shared(HERE(), datatype); case FilterType::FILTER_CHECKSUM_SHA256: - return make_shared(HERE()); + return make_shared(HERE(), datatype); case FilterType::FILTER_SCALE_FLOAT: { auto filter_config = deserializer.read(); @@ -163,10 +173,11 @@ shared_ptr tiledb::sm::FilterCreate::deserialize( HERE(), filter_config.byte_width, filter_config.scale, - filter_config.offset); + filter_config.offset, + datatype); }; case FilterType::FILTER_XOR: { - return make_shared(HERE()); + return make_shared(HERE(), datatype); }; case FilterType::FILTER_WEBP: { if constexpr (webp_filter_exists) { @@ -177,7 +188,8 @@ shared_ptr tiledb::sm::FilterCreate::deserialize( filter_config.format, filter_config.lossless, filter_config.y_extent, - filter_config.x_extent); + filter_config.x_extent, + datatype); } else { throw WebpNotPresentError(); } @@ -188,8 +200,8 @@ shared_ptr tiledb::sm::FilterCreate::deserialize( } } shared_ptr tiledb::sm::FilterCreate::deserialize( - Deserializer& deserializer, const uint32_t version) { + Deserializer& deserializer, const uint32_t version, Datatype datatype) { EncryptionKey encryption_key; return tiledb::sm::FilterCreate::deserialize( - deserializer, encryption_key, version); + deserializer, encryption_key, version, datatype); } \ No newline at end of file diff --git a/tiledb/sm/filter/filter_create.h b/tiledb/sm/filter/filter_create.h index 68c0f174923..c60f13d52ba 100644 --- a/tiledb/sm/filter/filter_create.h +++ b/tiledb/sm/filter/filter_create.h @@ -56,22 +56,25 @@ class FilterCreate { * @param deserializer The deserializer to deserialize from. * @param encryption_key. * @param version Array schema version + * @param datatype Datatype this filter operates on within it's pipeline. * @return Filter */ static shared_ptr deserialize( Deserializer& deserializer, const EncryptionKey& encryption_key, - const uint32_t version); + const uint32_t version, + Datatype datatype); /** * Deserializes a new Filter instance from the data in the given buffer. * * @param buff The buffer to deserialize from. * @param version Array schema version + * @param datatype Datatype this filter operates on within it's pipeline. * @return Filter */ static shared_ptr deserialize( - Deserializer& deserializer, const uint32_t version); + Deserializer& deserializer, const uint32_t version, Datatype datatype); }; } // namespace tiledb::sm diff --git a/tiledb/sm/filter/filter_pipeline.cc b/tiledb/sm/filter/filter_pipeline.cc index f79bada1c2e..d91fa27ac1b 100644 --- a/tiledb/sm/filter/filter_pipeline.cc +++ b/tiledb/sm/filter/filter_pipeline.cc @@ -50,6 +50,12 @@ using namespace tiledb::common; namespace tiledb { namespace sm { +class FilterPipelineStatusException : public StatusException { + public: + explicit FilterPipelineStatusException(const std::string& msg) + : StatusException("FilterPipeline", msg) { + } +}; FilterPipeline::FilterPipeline() : max_chunk_size_(constants::max_tile_chunk_size) { @@ -94,6 +100,41 @@ void FilterPipeline::clear() { filters_.clear(); } +void FilterPipeline::check_filter_types( + const FilterPipeline& pipeline, + const Datatype first_input_type, + bool is_var) { + if (pipeline.filters_.empty()) { + return; + } + + if ((first_input_type == Datatype::STRING_ASCII || + first_input_type == Datatype::STRING_UTF8) && + is_var && pipeline.size() > 1) { + if (pipeline.has_filter(FilterType::FILTER_RLE) && + pipeline.get_filter(0)->type() != FilterType::FILTER_RLE) { + throw FilterPipelineStatusException( + "RLE filter must be the first filter to apply when used on a " + "variable length string attribute"); + } + if (pipeline.has_filter(FilterType::FILTER_DICTIONARY) && + pipeline.get_filter(0)->type() != FilterType::FILTER_DICTIONARY) { + throw FilterPipelineStatusException( + "Dictionary filter must be the first filter to apply when used on a " + "variable length string attribute"); + } + } + + // ** Modern checks using Filter output type ** + pipeline.get_filter(0)->ensure_accepts_datatype(first_input_type); + auto input_type = pipeline.get_filter(0)->output_datatype(first_input_type); + for (unsigned i = 1; i < pipeline.size(); ++i) { + ensure_compatible( + *pipeline.get_filter(i - 1), *pipeline.get_filter(i), input_type); + input_type = pipeline.get_filter(i)->output_datatype(input_type); + } +} + tuple>> FilterPipeline::get_var_chunk_sizes( uint32_t chunk_size, @@ -490,7 +531,8 @@ void FilterPipeline::serialize(Serializer& serializer) const { // as a no-op filter instead. auto as_compression = dynamic_cast(f.get()); if (as_compression != nullptr && f->type() == FilterType::FILTER_NONE) { - auto noop = tdb_unique_ptr(new NoopFilter); + auto noop = + tdb_unique_ptr(tdb_new(NoopFilter, Datatype::ANY)); noop->serialize(serializer); } else { f->serialize(serializer); @@ -499,13 +541,14 @@ void FilterPipeline::serialize(Serializer& serializer) const { } FilterPipeline FilterPipeline::deserialize( - Deserializer& deserializer, const uint32_t version) { + Deserializer& deserializer, const uint32_t version, Datatype datatype) { auto max_chunk_size = deserializer.read(); auto num_filters = deserializer.read(); std::vector> filters; for (uint32_t i = 0; i < num_filters; i++) { - auto filter{FilterCreate::deserialize(deserializer, version)}; + auto filter{FilterCreate::deserialize(deserializer, version, datatype)}; + datatype = filter->output_datatype(datatype); filters.push_back(std::move(filter)); } @@ -522,6 +565,17 @@ void FilterPipeline::dump(FILE* out) const { } } +void FilterPipeline::ensure_compatible( + const Filter& first, const Filter& second, Datatype first_input_type) { + auto first_output_type = first.output_datatype(first_input_type); + if (!second.accepts_input_datatype(first_output_type)) { + throw FilterPipelineStatusException( + "Filter " + filter_type_str(first.type()) + " produces " + + datatype_str(first_output_type) + " but second filter " + + filter_type_str(second.type()) + " does not accept this type."); + } +} + bool FilterPipeline::has_filter(const FilterType& filter_type) const { for (auto& f : filters_) { if (f->type() == filter_type) @@ -553,7 +607,8 @@ Status FilterPipeline::append_encryption_filter( case EncryptionType::NO_ENCRYPTION: return Status::Ok(); case EncryptionType::AES_256_GCM: - pipeline->add_filter(EncryptionAES256GCMFilter(encryption_key)); + pipeline->add_filter( + EncryptionAES256GCMFilter(encryption_key, Datatype::ANY)); return Status::Ok(); default: return LOG_STATUS(Status_FilterError( diff --git a/tiledb/sm/filter/filter_pipeline.h b/tiledb/sm/filter/filter_pipeline.h index 9919dfbd25b..a1b39bd60c5 100644 --- a/tiledb/sm/filter/filter_pipeline.h +++ b/tiledb/sm/filter/filter_pipeline.h @@ -100,6 +100,12 @@ class FilterPipeline { /** Clears the pipeline (removes all filters. */ void clear(); + /** Checks that all filters in a pipeline have compatible types */ + static void check_filter_types( + const FilterPipeline& pipeline, + const Datatype first_input_type, + bool is_var = false); + /** * Populates the filter pipeline from the data in the input binary buffer. * @@ -108,7 +114,7 @@ class FilterPipeline { * @return FilterPipeline */ static FilterPipeline deserialize( - Deserializer& deserializer, const uint32_t version); + Deserializer& deserializer, const uint32_t version, Datatype datatype); /** * Dumps the filter pipeline details in ASCII format in the selected @@ -116,6 +122,14 @@ class FilterPipeline { */ void dump(FILE* out) const; + /** + * Checks that two filters have compatible input / output types. + * Checks fail if the first filter outputs a type not accepted by the second + * filter as input. + */ + static void ensure_compatible( + const Filter& first, const Filter& second, Datatype first_input_type); + /** * Returns pointer to the first instance of a filter in the pipeline with the * given filter subclass type. diff --git a/tiledb/sm/filter/float_scaling_filter.cc b/tiledb/sm/filter/float_scaling_filter.cc index ecf2187ae8a..5030bf1e45e 100644 --- a/tiledb/sm/filter/float_scaling_filter.cc +++ b/tiledb/sm/filter/float_scaling_filter.cc @@ -107,52 +107,43 @@ Status FloatScalingFilter::run_forward( FilterBuffer* output_metadata, FilterBuffer* output) const { switch (byte_width_) { - case sizeof(int8_t): { + case sizeof(int8_t): return run_forward( input_metadata, input, output_metadata, output); - } break; - case sizeof(int16_t): { + case sizeof(int16_t): return run_forward( input_metadata, input, output_metadata, output); - } break; - case sizeof(int32_t): { + case sizeof(int32_t): return run_forward( input_metadata, input, output_metadata, output); - } break; - case sizeof(int64_t): { + case sizeof(int64_t): return run_forward( input_metadata, input, output_metadata, output); - } break; - default: { + default: throw std::logic_error( "FloatScalingFilter::run_forward: byte_width_ does not reflect the " "size of an integer type."); - } } } Status FloatScalingFilter::run_forward( - const WriterTile& tile, + const WriterTile&, WriterTile* const, FilterBuffer* input_metadata, FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); switch (tile_type_size) { - case sizeof(float): { + case sizeof(float): return run_forward(input_metadata, input, output_metadata, output); - } break; - case sizeof(double): { + case sizeof(double): return run_forward( input_metadata, input, output_metadata, output); - } break; - default: { + default: throw std::logic_error( "FloatScalingFilter::run_forward: tile_type_size does not reflect " "the size of a floating point type."); - } } } @@ -227,7 +218,7 @@ Status FloatScalingFilter::run_reverse( } Status FloatScalingFilter::run_reverse( - const Tile& tile, + const Tile&, Tile*, FilterBuffer* input_metadata, FilterBuffer* input, @@ -235,8 +226,7 @@ Status FloatScalingFilter::run_reverse( FilterBuffer* output, const Config& config) const { (void)config; - auto tile_type = tile.type(); - auto tile_type_size = static_cast(datatype_size(tile_type)); + auto tile_type_size = static_cast(datatype_size(filter_data_type_)); switch (tile_type_size) { case sizeof(float): { return run_reverse(input_metadata, input, output_metadata, output); @@ -318,9 +308,31 @@ Status FloatScalingFilter::get_option_impl( return Status::Ok(); } +bool FloatScalingFilter::accepts_input_datatype(Datatype datatype) const { + size_t size = datatype_size(datatype); + return size == sizeof(float) || size == sizeof(double); +} + +Datatype FloatScalingFilter::output_datatype(Datatype) const { + if (byte_width_ == sizeof(int8_t)) { + return Datatype::INT8; + } else if (byte_width_ == sizeof(int16_t)) { + return Datatype::INT16; + } else if (byte_width_ == sizeof(int32_t)) { + return Datatype::INT32; + } else if (byte_width_ == sizeof(int64_t)) { + return Datatype::INT64; + } else { + throw std::logic_error( + "FloatScalingFilter::output_datatype: byte_width_ does not reflect " + "the size of an integer type."); + } +} + /** Returns a new clone of this filter. */ FloatScalingFilter* FloatScalingFilter::clone_impl() const { - return tdb_new(FloatScalingFilter, byte_width_, scale_, offset_); + return tdb_new( + FloatScalingFilter, byte_width_, scale_, offset_, filter_data_type_); } } // namespace sm diff --git a/tiledb/sm/filter/float_scaling_filter.h b/tiledb/sm/filter/float_scaling_filter.h index effd5fd3a73..54bfa0f60bc 100644 --- a/tiledb/sm/filter/float_scaling_filter.h +++ b/tiledb/sm/filter/float_scaling_filter.h @@ -66,9 +66,11 @@ class FloatScalingFilter : public Filter { /** * Default constructor. Default settings for Float Scaling Filter are * scale = 1.0f, offset = 0.0f, and byte_width = 8. + * + * @param filter_data_type Datatype the filter will operate on. */ - FloatScalingFilter() - : Filter(FilterType::FILTER_SCALE_FLOAT) + FloatScalingFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_SCALE_FLOAT, filter_data_type) , scale_(1.0f) , offset_(0.0f) , byte_width_(8) { @@ -80,9 +82,14 @@ class FloatScalingFilter : public Filter { * @param byte_width The byte width of the compressed representation. * @param scale The scale factor. * @param offset The offset factor. + * @param filter_data_type Datatype the filter will operate on. */ - FloatScalingFilter(uint64_t byte_width, double scale, double offset) - : Filter(FilterType::FILTER_SCALE_FLOAT) + FloatScalingFilter( + uint64_t byte_width, + double scale, + double offset, + Datatype filter_data_type) + : Filter(FilterType::FILTER_SCALE_FLOAT, filter_data_type) , scale_(scale) , offset_(offset) , byte_width_(byte_width) { @@ -127,6 +134,22 @@ class FloatScalingFilter : public Filter { /** Gets an option from this filter. */ Status get_option_impl(FilterOption option, void* value) const override; + /** + * Checks if the filter is applicable to the input datatype. + * + * @param type Input datatype to check filter compatibility. + */ + bool accepts_input_datatype(Datatype datatype) const override; + + /** + * Returns the filter output type + * + * @param input_type Expected type used for input. Used for filters which + * change output type based on input data. e.g. XORFilter output type is + * based on byte width of input type. + */ + Datatype output_datatype(Datatype input_type) const override; + private: /** The scale factor. */ double scale_; diff --git a/tiledb/sm/filter/noop_filter.cc b/tiledb/sm/filter/noop_filter.cc index 194fab94b81..49277f795f0 100644 --- a/tiledb/sm/filter/noop_filter.cc +++ b/tiledb/sm/filter/noop_filter.cc @@ -40,12 +40,12 @@ using namespace tiledb::common; namespace tiledb { namespace sm { -NoopFilter::NoopFilter() - : Filter(FilterType::FILTER_NONE) { +NoopFilter::NoopFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_NONE, filter_data_type) { } NoopFilter* NoopFilter::clone_impl() const { - return new NoopFilter; + return tdb_new(NoopFilter, filter_data_type_); } void NoopFilter::dump(FILE* out) const { diff --git a/tiledb/sm/filter/noop_filter.h b/tiledb/sm/filter/noop_filter.h index fdd949710fc..3e5bbc87f55 100644 --- a/tiledb/sm/filter/noop_filter.h +++ b/tiledb/sm/filter/noop_filter.h @@ -48,8 +48,10 @@ class NoopFilter : public Filter { public: /** * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. */ - NoopFilter(); + NoopFilter(Datatype filter_data_type); /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; diff --git a/tiledb/sm/filter/positive_delta_filter.cc b/tiledb/sm/filter/positive_delta_filter.cc index 85ed05b8b76..784c2960620 100644 --- a/tiledb/sm/filter/positive_delta_filter.cc +++ b/tiledb/sm/filter/positive_delta_filter.cc @@ -44,13 +44,14 @@ using namespace tiledb::common; namespace tiledb { namespace sm { -PositiveDeltaFilter::PositiveDeltaFilter() - : Filter(FilterType::FILTER_POSITIVE_DELTA) { +PositiveDeltaFilter::PositiveDeltaFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_POSITIVE_DELTA, filter_data_type) { max_window_size_ = 1024; } -PositiveDeltaFilter::PositiveDeltaFilter(uint32_t max_window_size) - : Filter(FilterType::FILTER_POSITIVE_DELTA) +PositiveDeltaFilter::PositiveDeltaFilter( + uint32_t max_window_size, Datatype filter_data_type) + : Filter(FilterType::FILTER_POSITIVE_DELTA, filter_data_type) , max_window_size_(max_window_size) { } @@ -60,6 +61,10 @@ void PositiveDeltaFilter::dump(FILE* out) const { fprintf(out, "PositiveDelta: POSITIVE_DELTA_MAX_WINDOW=%u", max_window_size_); } +bool PositiveDeltaFilter::accepts_input_datatype(Datatype datatype) const { + return !datatype_is_real(datatype); +} + Status PositiveDeltaFilter::run_forward( const WriterTile& tile, WriterTile* const offsets_tile, @@ -67,10 +72,9 @@ Status PositiveDeltaFilter::run_forward( FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output) const { - auto tile_type = tile.type(); - // If encoding can't work, just return the input unmodified. - if (!datatype_is_integer(tile_type) && tile_type != Datatype::BLOB) { + if (!datatype_is_integer(filter_data_type_) && + filter_data_type_ != Datatype::BLOB) { RETURN_NOT_OK(output->append_view(input)); RETURN_NOT_OK(output_metadata->append_view(input_metadata)); return Status::Ok(); @@ -78,8 +82,8 @@ Status PositiveDeltaFilter::run_forward( /* Note: Arithmetic operations cannot be performed on std::byte. We will use uint8_t for the Datatype::BLOB case as it is the same size as - std::byte and can have arithmetic perfomed on it. */ - switch (tile_type) { + std::byte and can have arithmetic performed on it. */ + switch (filter_data_type_) { case Datatype::INT8: return run_forward( tile, offsets_tile, input_metadata, input, output_metadata, output); @@ -246,11 +250,9 @@ Status PositiveDeltaFilter::run_reverse( FilterBuffer* output, const Config& config) const { (void)config; - - auto tile_type = tile.type(); - // If encoding wasn't applied, just return the input unmodified. - if (!datatype_is_integer(tile_type) && tile_type != Datatype::BLOB) { + if (!datatype_is_integer(filter_data_type_) && + filter_data_type_ != Datatype::BLOB) { RETURN_NOT_OK(output->append_view(input)); RETURN_NOT_OK(output_metadata->append_view(input_metadata)); return Status::Ok(); @@ -259,7 +261,7 @@ Status PositiveDeltaFilter::run_reverse( /* Note: Arithmetic operations cannot be performed on std::byte. We will use uint8_t for the Datatype::BLOB case as it is the same size as std::byte and can have arithmetic perfomed on it. */ - switch (tile_type) { + switch (filter_data_type_) { case Datatype::INT8: return run_reverse( tile, offsets_tile, input_metadata, input, output_metadata, output); @@ -318,14 +320,13 @@ Status PositiveDeltaFilter::run_reverse( template Status PositiveDeltaFilter::run_reverse( - const Tile& tile, + const Tile&, Tile* const, FilterBuffer* input_metadata, FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output) const { - auto tile_type = tile.type(); - auto tile_type_size = datatype_size(tile_type); + auto tile_type_size = datatype_size(filter_data_type_); uint32_t num_windows; RETURN_NOT_OK(input_metadata->read(&num_windows, sizeof(uint32_t))); @@ -406,7 +407,7 @@ void PositiveDeltaFilter::set_max_window_size(uint32_t max_window_size) { } PositiveDeltaFilter* PositiveDeltaFilter::clone_impl() const { - auto clone = new PositiveDeltaFilter; + auto clone = tdb_new(PositiveDeltaFilter, filter_data_type_); clone->max_window_size_ = max_window_size_; return clone; } diff --git a/tiledb/sm/filter/positive_delta_filter.h b/tiledb/sm/filter/positive_delta_filter.h index a73a5be578a..01fcb3e51d8 100644 --- a/tiledb/sm/filter/positive_delta_filter.h +++ b/tiledb/sm/filter/positive_delta_filter.h @@ -73,14 +73,20 @@ namespace sm { */ class PositiveDeltaFilter : public Filter { public: - /** Constructor. */ - PositiveDeltaFilter(); + /** + * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. + */ + PositiveDeltaFilter(Datatype filter_data_type); - /** Constructor. + /** + * Constructor. * - * @param max_window_size + * @param max_window_size Window size in bytes to apply positive delta filter. + * @param filter_data_type Datatype the filter will operate on. */ - PositiveDeltaFilter(uint32_t max_window_size); + PositiveDeltaFilter(uint32_t max_window_size, Datatype filter_data_type); /** Return the max window size used by the filter. */ uint32_t max_window_size() const; @@ -88,6 +94,13 @@ class PositiveDeltaFilter : public Filter { /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; + /** + * Checks if the filter is applicable to the input datatype. + * + * @param type Input datatype to check filter compatibility. + */ + bool accepts_input_datatype(Datatype datatype) const override; + /** * Perform positive-delta encoding of the given input into the given output. */ diff --git a/tiledb/sm/filter/test/compile_all_filters_main.cc b/tiledb/sm/filter/test/compile_all_filters_main.cc index d8930f462df..a251fbe7b44 100644 --- a/tiledb/sm/filter/test/compile_all_filters_main.cc +++ b/tiledb/sm/filter/test/compile_all_filters_main.cc @@ -33,7 +33,7 @@ int main() { (void)sizeof(tiledb::sm::FilterCreate); (void)static_cast (*)( - Deserializer & deserializer, const uint32_t version)>( + Deserializer & deserializer, const uint32_t version, Datatype datatype)>( tiledb::sm::FilterCreate::deserialize); return 0; } \ No newline at end of file diff --git a/tiledb/sm/filter/test/compile_webp_filter_main.cc b/tiledb/sm/filter/test/compile_webp_filter_main.cc index cea2e51b20f..77f9933c562 100644 --- a/tiledb/sm/filter/test/compile_webp_filter_main.cc +++ b/tiledb/sm/filter/test/compile_webp_filter_main.cc @@ -29,6 +29,6 @@ #include "tiledb/sm/filter/webp_filter.h" int main() { - tiledb::sm::WebpFilter(); + (void)sizeof(tiledb::sm::WebpFilter); return 0; } diff --git a/tiledb/sm/filter/test/unit_filter_create.cc b/tiledb/sm/filter/test/unit_filter_create.cc index 806c9311749..e2f818b530d 100644 --- a/tiledb/sm/filter/test/unit_filter_create.cc +++ b/tiledb/sm/filter/test/unit_filter_create.cc @@ -72,8 +72,8 @@ TEST_CASE( buffer_offset(p) = max_window_size0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -95,8 +95,8 @@ TEST_CASE( buffer_offset(p) = 0; // metadata_length Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -112,8 +112,8 @@ TEST_CASE( buffer_offset(p) = 0; // metadata_length Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -129,8 +129,8 @@ TEST_CASE( buffer_offset(p) = 0; // metadata_length Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -146,8 +146,8 @@ TEST_CASE( buffer_offset(p) = 0; // metadata_length Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -163,8 +163,8 @@ TEST_CASE( buffer_offset(p) = 0; // metadata_length Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -197,8 +197,8 @@ TEST_CASE( buffer_offset(p) = static_cast(compressor0); Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -218,8 +218,8 @@ TEST_CASE( buffer_offset(p) = level0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -246,8 +246,8 @@ TEST_CASE( buffer_offset(p) = level0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -274,8 +274,8 @@ TEST_CASE( buffer_offset(p) = level0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -302,8 +302,8 @@ TEST_CASE( buffer_offset(p) = level0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -330,8 +330,8 @@ TEST_CASE( buffer_offset(p) = static_cast(reinterpret_type0); Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; CHECK(filter1->type() == filter_type); Datatype reinterpret_type1; @@ -353,8 +353,8 @@ TEST_CASE("Filter: Test noop filter deserialization", "[filter][noop]") { buffer_offset(p) = 0; // metadata_length Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -371,8 +371,8 @@ TEST_CASE( buffer_offset(p) = sizeof(uint32_t); // metadata_length buffer_offset(p) = max_window_size0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -405,8 +405,8 @@ TEST_CASE( buffer_offset(p) = byte_width0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::FLOAT32)}; CHECK(filter1->type() == filtertype0); double scale1 = 0.0; REQUIRE(filter1->get_option(FilterOption::SCALE_FLOAT_FACTOR, &scale1).ok()); @@ -431,8 +431,8 @@ TEST_CASE("Filter: Test XOR filter deserialization", "[filter][xor]") { buffer_offset(p) = 0; // metadata_length Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter1{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter1{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::ANY)}; // Check type CHECK(filter1->type() == filtertype0); @@ -466,8 +466,8 @@ TEST_CASE("Filter: Test WEBP filter deserialization", "[filter][webp]") { buffer_offset(p) = x0; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filter{ - FilterCreate::deserialize(deserializer, constants::format_version)}; + auto filter{FilterCreate::deserialize( + deserializer, constants::format_version, Datatype::UINT8)}; CHECK(filter->type() == filterType); diff --git a/tiledb/sm/filter/test/unit_filter_pipeline.cc b/tiledb/sm/filter/test/unit_filter_pipeline.cc index 86eecb8a932..41407495019 100644 --- a/tiledb/sm/filter/test/unit_filter_pipeline.cc +++ b/tiledb/sm/filter/test/unit_filter_pipeline.cc @@ -103,8 +103,8 @@ TEST_CASE( filters_buffer_offset(p) = compressor_level3; Deserializer deserializer(&serialized_buffer, sizeof(serialized_buffer)); - auto filters{ - FilterPipeline::deserialize(deserializer, constants::format_version)}; + auto filters{FilterPipeline::deserialize( + deserializer, constants::format_version, Datatype::INT32)}; CHECK(filters.max_chunk_size() == max_chunk_size); CHECK(filters.size() == num_filters); @@ -128,10 +128,10 @@ TEST_CASE( TEST_CASE( "FilterPipeline: Test if filter list has a filter", "[filter-pipeline]") { FilterPipeline fp; - fp.add_filter(CompressionFilter(Compressor::ZSTD, 2)); - fp.add_filter(BitWidthReductionFilter()); - fp.add_filter(CompressionFilter(Compressor::RLE, 1)); - fp.add_filter(CompressionFilter(Compressor::LZ4, 1)); + fp.add_filter(CompressionFilter(Compressor::ZSTD, 2, Datatype::ANY)); + fp.add_filter(BitWidthReductionFilter(Datatype::ANY)); + fp.add_filter(CompressionFilter(Compressor::RLE, 1, Datatype::ANY)); + fp.add_filter(CompressionFilter(Compressor::LZ4, 1, Datatype::ANY)); // Check that filters are searched correctly CHECK(fp.has_filter(FilterType::FILTER_RLE)); @@ -156,14 +156,14 @@ TEST_CASE( // pipeline that contains an RLE or Dictionary compressor FilterPipeline fp_with; - fp_with.add_filter(CompressionFilter(Compressor::ZSTD, 2)); - fp_with.add_filter(BitWidthReductionFilter()); - fp_with.add_filter(CompressionFilter(f, 1)); + fp_with.add_filter(CompressionFilter(Compressor::ZSTD, 2, Datatype::ANY)); + fp_with.add_filter(BitWidthReductionFilter(Datatype::ANY)); + fp_with.add_filter(CompressionFilter(f, 1, Datatype::ANY)); // pipeline that doesn't contain an RLE or Dictionary compressor FilterPipeline fp_without; - fp_without.add_filter(CompressionFilter(Compressor::ZSTD, 2)); - fp_without.add_filter(BitWidthReductionFilter()); + fp_without.add_filter(CompressionFilter(Compressor::ZSTD, 2, Datatype::ANY)); + fp_without.add_filter(BitWidthReductionFilter(Datatype::ANY)); bool is_var_sized = true; @@ -197,7 +197,7 @@ TEST_CASE( // pipeline that contains an RLE or Dictionary compressor FilterPipeline fp; - fp.add_filter(CompressionFilter(filter, 1)); + fp.add_filter(CompressionFilter(filter, 1, Datatype::ANY)); fp.set_max_chunk_size(0); CHECK_FALSE(fp.use_tile_chunking(true, 0, Datatype::INT32)); @@ -216,14 +216,14 @@ TEST_CASE( // pipeline that contains an RLE or Dictionary compressor FilterPipeline fp_with; - fp_with.add_filter(CompressionFilter(Compressor::ZSTD, 2)); - fp_with.add_filter(BitWidthReductionFilter()); - fp_with.add_filter(CompressionFilter(f, 1)); + fp_with.add_filter(CompressionFilter(Compressor::ZSTD, 2, Datatype::ANY)); + fp_with.add_filter(BitWidthReductionFilter(Datatype::ANY)); + fp_with.add_filter(CompressionFilter(f, 1, Datatype::ANY)); // pipeline that doesn't contain an RLE or Dictionary compressor FilterPipeline fp_without; - fp_without.add_filter(CompressionFilter(Compressor::ZSTD, 2)); - fp_without.add_filter(BitWidthReductionFilter()); + fp_without.add_filter(CompressionFilter(Compressor::ZSTD, 2, Datatype::ANY)); + fp_without.add_filter(BitWidthReductionFilter(Datatype::ANY)); // Do not filter offsets if RLE is used for var-sized strings for schema // version >= 12 or Dictionary for version >=13 diff --git a/tiledb/sm/filter/test/unit_float_scale_input_validation.cc b/tiledb/sm/filter/test/unit_float_scale_input_validation.cc index a59e7370974..e0e4ed8e1a1 100644 --- a/tiledb/sm/filter/test/unit_float_scale_input_validation.cc +++ b/tiledb/sm/filter/test/unit_float_scale_input_validation.cc @@ -37,6 +37,7 @@ #include "../../../common/common.h" #include "../../enums/filter_option.h" #include "../float_scaling_filter.h" +#include "tiledb/sm/enums/datatype.h" using namespace tiledb::sm; @@ -56,14 +57,14 @@ void check_input_values() { void check_throw_message( FilterOption option, double* value, const std::string& message) { - FloatScalingFilter filter; + FloatScalingFilter filter(Datatype::FLOAT32); Status status = filter.set_option_impl(option, value); CHECK(!status.ok()); CHECK(status.message() == message); } void check_ok(FilterOption option, double* value) { - FloatScalingFilter filter; + FloatScalingFilter filter(Datatype::FLOAT32); Status status = filter.set_option_impl(option, value); CHECK(status.ok()); } diff --git a/tiledb/sm/filter/webp_filter.cc b/tiledb/sm/filter/webp_filter.cc index 8a45fcb1bf6..9ac57a93558 100644 --- a/tiledb/sm/filter/webp_filter.cc +++ b/tiledb/sm/filter/webp_filter.cc @@ -40,6 +40,10 @@ void WebpFilter::dump(FILE* out) const { out = stdout; fprintf(out, "WebpFilter"); } + +bool WebpFilter::accepts_input_datatype(Datatype datatype) const { + return datatype == Datatype::UINT8; +} } // namespace tiledb::sm #ifndef TILEDB_WEBP @@ -213,14 +217,14 @@ Status WebpFilter::run_forward( } Status WebpFilter::run_reverse( - const Tile& tile, + const Tile&, Tile* const, FilterBuffer* input_metadata, FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output, const Config&) const { - if (tile.type() != Datatype::UINT8) { + if (filter_data_type_ != Datatype::UINT8) { throw StatusException(Status_FilterError("Unsupported input type")); } return run_reverse(input_metadata, input, output_metadata, output); @@ -352,7 +356,8 @@ Status WebpFilter::get_option_impl(FilterOption option, void* value) const { format_, lossless_, extents_.first, - extents_.second); + extents_.second, + filter_data_type_); } void WebpFilter::serialize_impl(Serializer& serializer) const { diff --git a/tiledb/sm/filter/webp_filter.h b/tiledb/sm/filter/webp_filter.h index 85453b0330c..5842f6351be 100644 --- a/tiledb/sm/filter/webp_filter.h +++ b/tiledb/sm/filter/webp_filter.h @@ -107,27 +107,35 @@ class WebpFilter : public Filter { /* ********************************* */ /** + * Constructor. * Default setting for webp quality factor is 100.0 for lossy compression. * Caller must set colorspace format filter option. + * + * @param filter_data_type Datatype the filter will operate on. */ - WebpFilter() - : WebpFilter(100.0f, WebpInputFormat::WEBP_NONE, false, 0, 0) { + WebpFilter(Datatype filter_data_type) + : WebpFilter( + 100.0f, WebpInputFormat::WEBP_NONE, false, 0, 0, filter_data_type) { } /** + * Constructor. + * * @param quality Quality factor to use for WebP lossy compression. * @param inputFormat Colorspace format to use for WebP compression. * @param lossless Enable lossless compression. * @param y_extent Extent at dimension index 0. * @param x_extent Extent at dimension index 1. + * @param filter_data_type Datatype the filter will operate on. */ WebpFilter( float quality, WebpInputFormat inputFormat, bool lossless, uint16_t y_extent, - uint16_t x_extent) - : Filter(FilterType::FILTER_WEBP) + uint16_t x_extent, + Datatype filter_data_type) + : Filter(FilterType::FILTER_WEBP, filter_data_type) , quality_(quality) , format_(inputFormat) , lossless_(lossless) @@ -144,6 +152,13 @@ class WebpFilter : public Filter { */ void dump(FILE* out) const override; + /** + * Checks if the filter is applicable to the input datatype. + * + * @param type Input datatype to check filter compatibility. + */ + bool accepts_input_datatype(Datatype datatype) const override; + /** * Runs the filter forward, taking raw colorspace values as input and writing. * encoded WebP data to the TileDB Array. diff --git a/tiledb/sm/filter/xor_filter.cc b/tiledb/sm/filter/xor_filter.cc index f9917b9b7a1..2f44ecdf49a 100644 --- a/tiledb/sm/filter/xor_filter.cc +++ b/tiledb/sm/filter/xor_filter.cc @@ -48,19 +48,46 @@ void XORFilter::dump(FILE* out) const { fprintf(out, "XORFilter"); } +bool XORFilter::accepts_input_datatype(Datatype datatype) const { + switch (datatype_size(datatype)) { + case sizeof(int8_t): + case sizeof(int16_t): + case sizeof(int32_t): + case sizeof(int64_t): + return true; + default: + return false; + } +} + +Datatype XORFilter::output_datatype(tiledb::sm::Datatype input_type) const { + switch (datatype_size(input_type)) { + case sizeof(int8_t): + return Datatype::INT8; + case sizeof(int16_t): + return Datatype::INT16; + case sizeof(int32_t): + return Datatype::INT32; + case sizeof(int64_t): + return Datatype::INT64; + default: + throw StatusException(Status_FilterError( + "XORFilter::output_datatype: datatype size cannot be converted to " + "integer type.")); + } +} + Status XORFilter::run_forward( - const WriterTile& tile, + const WriterTile&, WriterTile* const, FilterBuffer* input_metadata, FilterBuffer* input, FilterBuffer* output_metadata, FilterBuffer* output) const { - auto tile_type = tile.type(); - // Since run_forward interprets the filter's data as integers, we case on // the size of the type and pass in the corresponding integer type into // a templated function. - switch (datatype_size(tile_type)) { + switch (datatype_size(filter_data_type_)) { case sizeof(int8_t): { return run_forward( input_metadata, input, output_metadata, output); @@ -148,7 +175,7 @@ Status XORFilter::xor_part(const ConstBuffer* part, Buffer* output) const { } Status XORFilter::run_reverse( - const Tile& tile, + const Tile&, Tile*, FilterBuffer* input_metadata, FilterBuffer* input, @@ -160,8 +187,7 @@ Status XORFilter::run_reverse( // Since run_reverse interprets the filter's data as integers, we case on // the size of the type and pass in the corresponding integer type into // a templated function. - auto tile_type = tile.type(); - switch (datatype_size(tile_type)) { + switch (datatype_size(filter_data_type_)) { case sizeof(int8_t): { return run_reverse( input_metadata, input, output_metadata, output); @@ -257,7 +283,7 @@ Status XORFilter::unxor_part(const ConstBuffer* part, Buffer* output) const { /** Returns a new clone of this filter. */ XORFilter* XORFilter::clone_impl() const { - return tdb_new(XORFilter); + return tdb_new(XORFilter, filter_data_type_); } } // namespace sm diff --git a/tiledb/sm/filter/xor_filter.h b/tiledb/sm/filter/xor_filter.h index 0a12c137485..2910cb926b5 100644 --- a/tiledb/sm/filter/xor_filter.h +++ b/tiledb/sm/filter/xor_filter.h @@ -58,15 +58,33 @@ namespace sm { class XORFilter : public Filter { public: /** - * Default constructor. + * Constructor. + * + * @param filter_data_type Datatype the filter will operate on. */ - XORFilter() - : Filter(FilterType::FILTER_XOR) { + XORFilter(Datatype filter_data_type) + : Filter(FilterType::FILTER_XOR, filter_data_type) { } /** Dumps the filter details in ASCII format in the selected output. */ void dump(FILE* out) const override; + /** + * Checks if the filter is applicable to the input datatype. + * + * @param type Input datatype to check filter compatibility. + */ + bool accepts_input_datatype(Datatype datatype) const override; + + /** + * Returns the filter output type + * + * @param input_type Expected type used for input. Used for filters which + * change output type based on input data. e.g. XORFilter output type is + * based on byte width of input type. + */ + Datatype output_datatype(Datatype input_type) const override; + /** * Run forward. Takes input data parts, and per part it stores the first * element in the part, and then the differences of each consecutive pair diff --git a/tiledb/sm/serialization/array_schema.cc b/tiledb/sm/serialization/array_schema.cc index 1b8a005956d..827eec3532a 100644 --- a/tiledb/sm/serialization/array_schema.cc +++ b/tiledb/sm/serialization/array_schema.cc @@ -181,7 +181,7 @@ Status filter_pipeline_to_capnp( } tuple>> filter_from_capnp( - const capnp::Filter::Reader& filter_reader) { + const capnp::Filter::Reader& filter_reader, Datatype datatype) { FilterType type = FilterType::FILTER_NONE; RETURN_NOT_OK_TUPLE( filter_type_enum(filter_reader.getType().cStr(), &type), nullopt); @@ -192,14 +192,16 @@ tuple>> filter_from_capnp( uint32_t window = data.getUint32(); return { Status::Ok(), - tiledb::common::make_shared(HERE(), window)}; + tiledb::common::make_shared( + HERE(), window, datatype)}; } case FilterType::FILTER_POSITIVE_DELTA: { auto data = filter_reader.getData(); uint32_t window = data.getUint32(); return { Status::Ok(), - tiledb::common::make_shared(HERE(), window)}; + tiledb::common::make_shared( + HERE(), window, datatype)}; } case FilterType::FILTER_GZIP: case FilterType::FILTER_ZSTD: @@ -213,7 +215,8 @@ tuple>> filter_from_capnp( int32_t level = data.getInt32(); return { Status::Ok(), - tiledb::common::make_shared(HERE(), type, level)}; + tiledb::common::make_shared( + HERE(), type, level, datatype)}; } case FilterType::FILTER_SCALE_FLOAT: { if (filter_reader.hasFloatScaleConfig()) { @@ -224,40 +227,48 @@ tuple>> filter_from_capnp( return { Status::Ok(), tiledb::common::make_shared( - HERE(), byte_width, scale, offset)}; + HERE(), byte_width, scale, offset, datatype)}; } return { Status::Ok(), - tiledb::common::make_shared(HERE())}; + tiledb::common::make_shared(HERE(), datatype)}; } case FilterType::FILTER_NONE: { - return {Status::Ok(), tiledb::common::make_shared(HERE())}; + return { + Status::Ok(), + tiledb::common::make_shared(HERE(), datatype)}; } case FilterType::FILTER_BITSHUFFLE: { return { - Status::Ok(), tiledb::common::make_shared(HERE())}; + Status::Ok(), + tiledb::common::make_shared(HERE(), datatype)}; } case FilterType::FILTER_BYTESHUFFLE: { return { - Status::Ok(), tiledb::common::make_shared(HERE())}; + Status::Ok(), + tiledb::common::make_shared(HERE(), datatype)}; } case FilterType::FILTER_CHECKSUM_MD5: { return { - Status::Ok(), tiledb::common::make_shared(HERE())}; + Status::Ok(), + tiledb::common::make_shared(HERE(), datatype)}; } case FilterType::FILTER_CHECKSUM_SHA256: { return { Status::Ok(), - tiledb::common::make_shared(HERE())}; + tiledb::common::make_shared(HERE(), datatype)}; } case FilterType::INTERNAL_FILTER_AES_256_GCM: { return { Status::Ok(), - tiledb::common::make_shared(HERE())}; + tiledb::common::make_shared( + HERE(), datatype)}; } case FilterType::FILTER_XOR: { - return {Status::Ok(), tiledb::common::make_shared(HERE())}; + return { + Status::Ok(), + tiledb::common::make_shared(HERE(), datatype)}; } case FilterType::FILTER_WEBP: { if constexpr (webp_filter_exists) { @@ -271,10 +282,17 @@ tuple>> filter_from_capnp( return { Status::Ok(), tiledb::common::make_shared( - HERE(), quality, format, lossless, extent_x, extent_y)}; + HERE(), + quality, + format, + lossless, + extent_x, + extent_y, + datatype)}; } else { return { - Status::Ok(), tiledb::common::make_shared(HERE())}; + Status::Ok(), + tiledb::common::make_shared(HERE(), datatype)}; } } else { throw WebpNotPresentError(); @@ -295,15 +313,20 @@ tuple>> filter_from_capnp( } tuple>> filter_pipeline_from_capnp( - const capnp::FilterPipeline::Reader& filter_pipeline_reader) { + const capnp::FilterPipeline::Reader& filter_pipeline_reader, + Datatype datatype) { if (!filter_pipeline_reader.hasFilters()) return {Status::Ok(), make_shared(HERE())}; std::vector> filter_list; auto filter_list_reader = filter_pipeline_reader.getFilters(); for (auto filter_reader : filter_list_reader) { - auto&& [st_f, filter]{filter_from_capnp(filter_reader)}; + // Deserialize and initialize filter with filter datatype within the + // pipeline. + auto&& [st_f, filter]{filter_from_capnp(filter_reader, datatype)}; RETURN_NOT_OK_TUPLE(st_f, nullopt); + // Update datatype to next in pipeline. + datatype = filter.value()->output_datatype(datatype); filter_list.push_back(filter.value()); } @@ -376,7 +399,8 @@ shared_ptr attribute_from_capnp( shared_ptr filters{}; if (attribute_reader.hasFilterPipeline()) { auto filter_pipeline_reader = attribute_reader.getFilterPipeline(); - auto&& [st_fp, f]{filter_pipeline_from_capnp(filter_pipeline_reader)}; + auto&& [st_fp, f]{ + filter_pipeline_from_capnp(filter_pipeline_reader, datatype)}; throw_if_not_ok(st_fp); filters = f.value(); } else { @@ -612,7 +636,7 @@ shared_ptr dimension_from_capnp( shared_ptr filters{}; if (dimension_reader.hasFilterPipeline()) { auto reader = dimension_reader.getFilterPipeline(); - auto&& [st_fp, f]{filter_pipeline_from_capnp(reader)}; + auto&& [st_fp, f]{filter_pipeline_from_capnp(reader, dim_type)}; if (!st_fp.ok()) { throw std::runtime_error( "[Deserialization::dimension_from_capnp] Failed to deserialize " @@ -1013,7 +1037,7 @@ ArraySchema array_schema_from_capnp( FilterPipeline coords_filters; if (schema_reader.hasCoordsFilterPipeline()) { auto reader = schema_reader.getCoordsFilterPipeline(); - auto&& [st_fp, filters]{filter_pipeline_from_capnp(reader)}; + auto&& [st_fp, filters]{filter_pipeline_from_capnp(reader, Datatype::ANY)}; if (!st_fp.ok()) { throw std::runtime_error( "[Deserialization::array_schema_from_capnp] Cannot deserialize " @@ -1028,7 +1052,8 @@ ArraySchema array_schema_from_capnp( FilterPipeline cell_var_offsets_filters; if (schema_reader.hasOffsetFilterPipeline()) { auto reader = schema_reader.getOffsetFilterPipeline(); - auto&& [st_fp, filters]{filter_pipeline_from_capnp(reader)}; + auto&& [st_fp, filters]{ + filter_pipeline_from_capnp(reader, Datatype::UINT64)}; if (!st_fp.ok()) { throw std::runtime_error( "[Deserialization::array_schema_from_capnp] Cannot deserialize " @@ -1043,7 +1068,8 @@ ArraySchema array_schema_from_capnp( FilterPipeline cell_validity_filters; if (schema_reader.hasValidityFilterPipeline()) { auto reader = schema_reader.getValidityFilterPipeline(); - auto&& [st_fp, filters]{filter_pipeline_from_capnp(reader)}; + auto&& [st_fp, filters]{ + filter_pipeline_from_capnp(reader, Datatype::UINT8)}; if (!st_fp.ok()) { throw std::runtime_error( "[Deserialization::array_schema_from_capnp] Cannot deserialize " diff --git a/tiledb/sm/serialization/array_schema.h b/tiledb/sm/serialization/array_schema.h index 53b8be082c0..01aa4a3788c 100644 --- a/tiledb/sm/serialization/array_schema.h +++ b/tiledb/sm/serialization/array_schema.h @@ -69,10 +69,11 @@ Status filter_to_capnp( * Deserialize a filter from a cap'n proto object * * @param filter_reader Cap'n proto object + * @param datatype Datatype the filter operates on within it's pipeline. * @return Status */ tuple>> filter_from_capnp( - const capnp::Filter::Reader& filter_reader); + const capnp::Filter::Reader& filter_reader, Datatype datatype); /** * Serialize an array schema to cap'n proto object diff --git a/tiledb/sm/serialization/test/unit_capnp_utils.cc b/tiledb/sm/serialization/test/unit_capnp_utils.cc index 3aef1a1851d..b2e67ccc797 100644 --- a/tiledb/sm/serialization/test/unit_capnp_utils.cc +++ b/tiledb/sm/serialization/test/unit_capnp_utils.cc @@ -121,8 +121,8 @@ TEST_CASE("Serialize and deserialize attribute", "[attribute][serialization]") { SECTION("Non-default filters pipeline") { attr = make_shared(HERE(), "attr1", Datatype::UINT64); FilterPipeline filters; - filters.add_filter(CompressionFilter(Compressor::ZSTD, 2)); - filters.add_filter(BitWidthReductionFilter()); + filters.add_filter(CompressionFilter(Compressor::ZSTD, 2, attr->type())); + filters.add_filter(BitWidthReductionFilter(attr->type())); attr->set_filter_pipeline(filters); } SECTION("Multiple cell values") { diff --git a/tiledb/sm/tile/generic_tile_io.cc b/tiledb/sm/tile/generic_tile_io.cc index ff07088bef6..6c79d61a866 100644 --- a/tiledb/sm/tile/generic_tile_io.cc +++ b/tiledb/sm/tile/generic_tile_io.cc @@ -164,7 +164,9 @@ GenericTileIO::GenericTileHeader GenericTileIO::read_generic_tile_header( Deserializer filter_pipeline_deserializer( filter_pipeline_buf.data(), filter_pipeline_buf.size()); auto filterpipeline{FilterPipeline::deserialize( - filter_pipeline_deserializer, header.version_number)}; + filter_pipeline_deserializer, + header.version_number, + static_cast(header.datatype))}; header.filters = std::move(filterpipeline); return header; @@ -258,7 +260,8 @@ Status GenericTileIO::init_generic_tile_header( header->filters.add_filter(CompressionFilter( constants::generic_tile_compressor, - constants::generic_tile_compression_level)); + constants::generic_tile_compression_level, + tile->type())); RETURN_NOT_OK(FilterPipeline::append_encryption_filter( &header->filters, encryption_key));