diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index 208c21c2dc0..0f05dcb4bb3 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -118,10 +118,9 @@ test: - test -f $PREFIX/include/cudf/hashing.hpp - test -f $PREFIX/include/cudf/interop.hpp - test -f $PREFIX/include/cudf/io/avro.hpp + - test -f $PREFIX/include/cudf/io/csv.hpp - test -f $PREFIX/include/cudf/io/data_sink.hpp - test -f $PREFIX/include/cudf/io/datasource.hpp - - test -f $PREFIX/include/cudf/io/orc_metadata.hpp - - test -f $PREFIX/include/cudf/io/csv.hpp - test -f $PREFIX/include/cudf/io/detail/avro.hpp - test -f $PREFIX/include/cudf/io/detail/csv.hpp - test -f $PREFIX/include/cudf/io/detail/json.hpp @@ -129,8 +128,15 @@ test: - test -f $PREFIX/include/cudf/io/detail/parquet.hpp - test -f $PREFIX/include/cudf/io/detail/utils.hpp - test -f $PREFIX/include/cudf/io/json.hpp + - test -f $PREFIX/include/cudf/io/orc_metadata.hpp - test -f $PREFIX/include/cudf/io/orc.hpp - test -f $PREFIX/include/cudf/io/parquet.hpp + - test -f $PREFIX/include/cudf/io/text/data_chunk_source_factories.hpp + - test -f $PREFIX/include/cudf/io/text/data_chunk_source.hpp + - test -f $PREFIX/include/cudf/io/text/detail/multistate.hpp + - test -f $PREFIX/include/cudf/io/text/detail/tile_state.hpp + - test -f $PREFIX/include/cudf/io/text/detail/trie.hpp + - test -f $PREFIX/include/cudf/io/text/multibyte_split.hpp - test -f $PREFIX/include/cudf/io/types.hpp - test -f $PREFIX/include/cudf/ipc.hpp - test -f $PREFIX/include/cudf/join.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d6b457a94d4..d9a493f57a0 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -307,6 +307,7 @@ add_library(cudf src/io/parquet/writer_impl.cu src/io/statistics/orc_column_statistics.cu src/io/statistics/parquet_column_statistics.cu + src/io/text/multibyte_split.cu src/io/utilities/column_buffer.cpp src/io/utilities/data_sink.cpp src/io/utilities/datasource.cpp diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 56f17dc7090..b3b92003573 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -245,3 +245,8 @@ ConfigureBench(STRINGS_BENCH # - json benchmark ------------------------------------------------------------------- ConfigureBench(JSON_BENCH string/json_benchmark.cpp) + +################################################################################################### +# - io benchmark --------------------------------------------------------------------- +ConfigureBench(MULTIBYTE_SPLIT_BENCHMARK + io/text/multibyte_split_benchmark.cpp) diff --git a/cpp/benchmarks/io/cuio_benchmark_common.hpp b/cpp/benchmarks/io/cuio_benchmark_common.hpp index 2c49386a901..7107585dbcc 100644 --- a/cpp/benchmarks/io/cuio_benchmark_common.hpp +++ b/cpp/benchmarks/io/cuio_benchmark_common.hpp @@ -33,6 +33,8 @@ using cudf::io::io_type; benchmark(name##_buffer_output, type_or_group, static_cast(io_type::HOST_BUFFER)); \ benchmark(name##_void_output, type_or_group, static_cast(io_type::VOID)); +std::string random_file_in_dir(std::string const& dir_path); + /** * @brief Class to create a coupled `source_info` and `sink_info` of given type. */ diff --git a/cpp/benchmarks/io/text/multibyte_split_benchmark.cpp b/cpp/benchmarks/io/text/multibyte_split_benchmark.cpp new file mode 100644 index 00000000000..cb8a61caa57 --- /dev/null +++ b/cpp/benchmarks/io/text/multibyte_split_benchmark.cpp @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include + +using cudf::test::fixed_width_column_wrapper; + +temp_directory const temp_dir("cudf_gbench"); + +enum data_chunk_source_type { + device, + file, + host, +}; + +static cudf::string_scalar create_random_input(int32_t num_chars, + double delim_factor, + double deviation, + std::string delim) +{ + auto const num_delims = static_cast((num_chars * delim_factor) / delim.size()); + auto const num_delim_chars = num_delims * delim.size(); + auto const num_value_chars = num_chars - num_delim_chars; + auto const num_rows = num_delims; + auto const value_size_avg = static_cast(num_value_chars / num_rows); + auto const value_size_min = static_cast(value_size_avg * (1 - deviation)); + auto const value_size_max = static_cast(value_size_avg * (1 + deviation)); + + data_profile table_profile; + + table_profile.set_distribution_params( // + cudf::type_id::STRING, + distribution_id::NORMAL, + value_size_min, + value_size_max); + + auto const values_table = create_random_table( // + {cudf::type_id::STRING}, + 1, + row_count{num_rows}, + table_profile); + + auto delim_scalar = cudf::make_string_scalar(delim); + auto delims_column = cudf::make_column_from_scalar(*delim_scalar, num_rows); + auto input_table = cudf::table_view({values_table->get_column(0).view(), delims_column->view()}); + auto input_column = cudf::strings::concatenate(input_table); + + // extract the chars from the returned strings column. + auto input_column_contents = input_column->release(); + auto chars_column_contents = input_column_contents.children[1]->release(); + auto chars_buffer = chars_column_contents.data.release(); + + // turn the chars in to a string scalar. + return cudf::string_scalar(std::move(*chars_buffer)); +} + +static void BM_multibyte_split(benchmark::State& state) +{ + auto source_type = static_cast(state.range(0)); + auto delim_size = state.range(1); + auto delim_percent = state.range(2); + auto file_size_approx = state.range(3); + + CUDF_EXPECTS(delim_percent >= 1, "delimiter percent must be at least 1"); + CUDF_EXPECTS(delim_percent <= 50, "delimiter percent must be at most 50"); + + auto delim = std::string(":", delim_size); + + auto delim_factor = static_cast(delim_percent) / 100; + auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim); + auto host_input = thrust::host_vector(device_input.size()); + auto host_string = std::string(host_input.data(), host_input.size()); + + cudaMemcpyAsync(host_input.data(), + device_input.data(), + device_input.size() * sizeof(char), + cudaMemcpyDeviceToHost, + rmm::cuda_stream_default); + + auto temp_file_name = random_file_in_dir(temp_dir.path()); + + { + auto temp_fostream = std::ofstream(temp_file_name, std::ofstream::out); + temp_fostream.write(host_input.data(), host_input.size()); + } + + cudaDeviceSynchronize(); + + auto source = std::unique_ptr(nullptr); + + switch (source_type) { + case data_chunk_source_type::file: // + source = cudf::io::text::make_source_from_file(temp_file_name); + break; + case data_chunk_source_type::host: // + source = cudf::io::text::make_source(host_string); + break; + case data_chunk_source_type::device: // + source = cudf::io::text::make_source(device_input); + break; + default: CUDF_FAIL(); + } + + for (auto _ : state) { + cuda_event_timer raii(state, true); + auto output = cudf::io::text::multibyte_split(*source, delim); + } + + state.SetBytesProcessed(state.iterations() * device_input.size()); +} + +class MultibyteSplitBenchmark : public cudf::benchmark { +}; + +#define TRANSPOSE_BM_BENCHMARK_DEFINE(name) \ + BENCHMARK_DEFINE_F(MultibyteSplitBenchmark, name)(::benchmark::State & state) \ + { \ + BM_multibyte_split(state); \ + } \ + BENCHMARK_REGISTER_F(MultibyteSplitBenchmark, name) \ + ->ArgsProduct({{data_chunk_source_type::device, \ + data_chunk_source_type::file, \ + data_chunk_source_type::host}, \ + {1, 4, 7}, \ + {1, 25}, \ + {1 << 15, 1 << 30}}) \ + ->UseManualTime() \ + ->Unit(::benchmark::kMillisecond); + +TRANSPOSE_BM_BENCHMARK_DEFINE(multibyte_split_simple); diff --git a/cpp/include/cudf/column/column_factories.hpp b/cpp/include/cudf/column/column_factories.hpp index bdb7fd48e60..ebd7f5bbef0 100644 --- a/cpp/include/cudf/column/column_factories.hpp +++ b/cpp/include/cudf/column/column_factories.hpp @@ -442,6 +442,26 @@ std::unique_ptr make_strings_column( rmm::cuda_stream_view stream = rmm::cuda_stream_default, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @brief Construct a STRING type column given offsets, columns, and optional null count and null + * mask. + * + * @param[in] num_strings The number of strings the column represents. + * @param[in] offsets The offset values for this column. The number of elements is one more than the + * total number of strings so the `offset[last] - offset[0]` is the total number of bytes in the + * strings vector. + * @param[in] chars The char bytes for all the strings for this column. Individual strings are + * identified by the offsets and the nullmask. + * @param[in] null_mask The bits specifying the null strings in device memory. Arrow format for + * nulls is used for interpreting this bitmask. + * @param[in] null_count The number of null string entries. + */ +std::unique_ptr make_strings_column(size_type num_strings, + rmm::device_uvector&& offsets, + rmm::device_uvector&& chars, + rmm::device_buffer&& null_mask = {}, + size_type null_count = cudf::UNKNOWN_NULL_COUNT); + /** * @brief Construct a LIST type column given offsets column, child column, null mask and null * count. diff --git a/cpp/include/cudf/io/text/data_chunk_source.hpp b/cpp/include/cudf/io/text/data_chunk_source.hpp new file mode 100644 index 00000000000..6ee1fa033d0 --- /dev/null +++ b/cpp/include/cudf/io/text/data_chunk_source.hpp @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +namespace cudf { +namespace io { +namespace text { + +/** + * @brief a reader capable of producing views over device memory. + * + * The data chunk reader API encapsulates the idea of statefully traversing and loading a data + * source. A data source may be a file, a region of device memory, or a region of host memory. + * Reading data from these data sources efficiently requires different strategies dependings on the + * type of data source, type of compression, capabilities of the host and device, the data's + * destination. Whole-file decompression should be hidden behind this interface + * + */ +class data_chunk_reader { + public: + /** + * @brief Get the next chunk of bytes from the data source + * + * Performs any necessary work to read and prepare the underlying data source for consumption as a + * view over device memory. Common implementations may read from a file, copy data from host + * memory, allocate temporary memory, perform iterative decompression, or even launch device + * kernels. + * + * @param size number of bytes to read. + * @param stream stream to associate allocations or perform work required to obtain chunk + * @return a chunk of data up to @param size bytes. May return less than @param size bytes if + * reader reaches end of underlying data source. Returned data must be accessed in stream order + * relative to the specified @param stream. + */ + virtual device_span get_next_chunk(std::size_t size, + rmm::cuda_stream_view stream) = 0; +}; + +/** + * @brief a data source capable of creating a reader which can produce views of the data source in + * device memory. + * + */ +class data_chunk_source { + public: + virtual std::unique_ptr create_reader() const = 0; +}; + +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/include/cudf/io/text/data_chunk_source_factories.hpp b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp new file mode 100644 index 00000000000..f6807c1c9a8 --- /dev/null +++ b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace cudf { +namespace io { +namespace text { + +namespace { + +/** + * @brief a reader which produces views of device memory which contain a copy of the data from an + * istream. + * + */ +class istream_data_chunk_reader : public data_chunk_reader { + struct host_ticket { + cudaEvent_t event; + thrust::host_vector> buffer; + }; + + public: + istream_data_chunk_reader(std::unique_ptr datastream) + : _datastream(std::move(datastream)), _buffers(), _tickets(2) + { + // create an event to track the completion of the last device-to-host copy. + for (std::size_t i = 0; i < _tickets.size(); i++) { + CUDA_TRY(cudaEventCreate(&(_tickets[i].event))); + } + } + + ~istream_data_chunk_reader() + { + for (std::size_t i = 0; i < _tickets.size(); i++) { + CUDA_TRY(cudaEventDestroy(_tickets[i].event)); + } + } + + device_span find_or_create_data(std::size_t size, rmm::cuda_stream_view stream) + { + auto search = _buffers.find(stream.value()); + + if (search == _buffers.end() || search->second.size() < size) { + _buffers[stream.value()] = rmm::device_buffer(size, stream); + } + + return device_span(static_cast(_buffers[stream.value()].data()), size); + } + + device_span get_next_chunk(std::size_t read_size, + rmm::cuda_stream_view stream) override + { + CUDF_FUNC_RANGE(); + + auto& h_ticket = _tickets[_next_ticket_idx]; + + _next_ticket_idx = (_next_ticket_idx + 1) % _tickets.size(); + + // synchronize on the last host-to-device copy, so we don't clobber the host buffer. + CUDA_TRY(cudaEventSynchronize(h_ticket.event)); + + // resize the host buffer as necessary to contain the requested number of bytes + if (h_ticket.buffer.size() < read_size) { h_ticket.buffer.resize(read_size); } + + // read data from the host istream in to the pinned host memory buffer + _datastream->read(h_ticket.buffer.data(), read_size); + + // adjust the read size to reflect how many bytes were actually read from the data stream + read_size = _datastream->gcount(); + + // get a view over some device memory we can use to buffer the read data on to device. + auto chunk_span = find_or_create_data(read_size, stream); + + // copy the host-pinned data on to device + CUDA_TRY(cudaMemcpyAsync( // + chunk_span.data(), + h_ticket.buffer.data(), + read_size, + cudaMemcpyHostToDevice, + stream.value())); + + // record the host-to-device copy. + CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value())); + + // return the view over device memory so it can be processed. + return chunk_span; + } + + private: + std::size_t _next_ticket_idx = 0; + std::unique_ptr _datastream; + std::unordered_map _buffers; + std::vector _tickets; +}; + +/** + * @brief a reader which produces view of device memory which represent a subset of the input device + * span + * + */ +class device_span_data_chunk_reader : public data_chunk_reader { + public: + device_span_data_chunk_reader(device_span data) : _data(data) {} + + device_span get_next_chunk(std::size_t read_size, + rmm::cuda_stream_view stream) override + { + // limit the read size to the number of bytes remaining in the device_span. + if (read_size > _data.size() - _position) { read_size = _data.size() - _position; } + + // create a view over the device span + auto chunk_span = _data.subspan(_position, read_size); + + // increment position + _position += read_size; + + // return the view over device memory so it can be processed. + return chunk_span; + } + + private: + device_span _data; + uint64_t _position = 0; +}; + +/** + * @brief a file data source which creates an istream_data_chunk_reader + * + */ +class file_data_chunk_source : public data_chunk_source { + public: + file_data_chunk_source(std::string filename) : _filename(filename) {} + std::unique_ptr create_reader() const override + { + return std::make_unique( + std::make_unique(_filename, std::ifstream::in)); + } + + private: + std::string _filename; +}; + +/** + * @brief a host string data source which creates an istream_data_chunk_reader + */ +class string_data_chunk_source : public data_chunk_source { + public: + string_data_chunk_source(std::string const& data) : _data(data) {} + std::unique_ptr create_reader() const override + { + return std::make_unique(std::make_unique(_data)); + } + + private: + std::string const& _data; +}; + +/** + * @brief a device span data source which creates an istream_data_chunk_reader + */ +class device_span_data_chunk_source : public data_chunk_source { + public: + device_span_data_chunk_source(device_span data) : _data(data) {} + std::unique_ptr create_reader() const override + { + return std::make_unique(_data); + } + + private: + device_span _data; +}; + +} // namespace + +/** + * @brief Creates a data source capable of producing device-buffered views of the given string. + */ +std::unique_ptr make_source(std::string const& data) +{ + return std::make_unique(data); +} + +/** + * @brief Creates a data source capable of producing device-buffered views of the file + */ +std::unique_ptr make_source_from_file(std::string const& filename) +{ + return std::make_unique(filename); +} + +/** + * @brief Creates a data source capable of producing views of the given device string scalar + */ +std::unique_ptr make_source(cudf::string_scalar& data) +{ + auto data_span = device_span(data.data(), data.size()); + return std::make_unique(data_span); +} + +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/include/cudf/io/text/detail/multistate.hpp b/cpp/include/cudf/io/text/detail/multistate.hpp new file mode 100644 index 00000000000..d3c8909ab51 --- /dev/null +++ b/cpp/include/cudf/io/text/detail/multistate.hpp @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace cudf { +namespace io { +namespace text { +namespace detail { + +/** + * @brief Represents up to 7 segments + */ +struct multistate { + private: + /** + * @brief represents a (head, tail] segment, stored as a single 8 bit value + */ + struct multistate_segment { + public: + /** + * @brief Creates a segment which represents (0, 0] + */ + + constexpr multistate_segment() : _data(0) {} + /** + * @brief Creates a segment which represents (head, tail] + * + * @param head the (head, ____] value. Undefined behavior for values >= 16 + * @param tail the (____, tail] value. Undefined behavior for values >= 16 + */ + + constexpr multistate_segment(uint8_t head, uint8_t tail) : _data((head & 0b1111) | (tail << 4)) + { + } + + /** + * @brief Get's the (head, ____] value from the segment. + */ + constexpr uint8_t get_head() const { return _data & 0b1111; } + + /** + * @brief Get's the (____, tail] value from the segment. + */ + constexpr uint8_t get_tail() const { return _data >> 4; } + + private: + uint8_t _data; + }; + + public: + /** + * @brief The maximum state (head or tail) this multistate can represent + */ + + static auto constexpr max_segment_value = 15; + /** + * @brief The maximum number of segments this multistate can represent + */ + static auto constexpr max_segment_count = 7; + + /** + * @brief Enqueues a (head, tail] segment to this multistate + * + * @note: The behavior of this function is undefined if size() => max_segment_count + */ + constexpr void enqueue(uint8_t head, uint8_t tail) + { + _segments[_size++] = multistate_segment(head, tail); + } + + /** + * @brief get's the number of segments this multistate represents + */ + constexpr uint8_t size() const { return _size; } + + /** + * @brief get's the highest (____, tail] value this multistate represents + */ + constexpr uint8_t max_tail() const + { + uint8_t maximum = 0; + + for (uint8_t i = 0; i < _size; i++) { + maximum = std::max(maximum, get_tail(i)); + } + + return maximum; + } + + /** + * @brief get's the Nth (head, ____] value state this multistate represents + */ + constexpr uint8_t get_head(uint8_t idx) const { return _segments[idx].get_head(); } + + /** + * @brief get's the Nth (____, tail] value state this multistate represents + */ + constexpr uint8_t get_tail(uint8_t idx) const { return _segments[idx].get_tail(); } + + private: + uint8_t _size = 0; + multistate_segment _segments[max_segment_count]; +}; + +/** + * @brief associatively inner-joins transition histories. + * + * Examples: + * <(0, 5]> + <(5, 9]> = <(0, 9]> + * <(0, 5]> + <(6, 9]> = <> + * <(0, 1], (0, 2]> + <(2, 3], (1, 4]> = <(0, 4], (0, 3]> + * <(0, 1], (0, 2]> + <(1, 3]> = <(0, 3]> + * + * Head and tail value are limited to [0, 1, ..., 16] + * + * @param lhs past segments + * @param rhs future segments + * @return full join of past and future segments + */ +constexpr multistate operator+(multistate const& lhs, multistate const& rhs) +{ + // combine two multistates together by full-joining LHS tails to RHS heads, + // and taking the corresponding LHS heads and RHS tails. + + multistate result; + for (uint8_t lhs_idx = 0; lhs_idx < lhs.size(); lhs_idx++) { + auto tail = lhs.get_tail(lhs_idx); + for (uint8_t rhs_idx = 0; rhs_idx < rhs.size(); rhs_idx++) { + auto head = rhs.get_head(rhs_idx); + if (tail == head) { result.enqueue(lhs.get_head(lhs_idx), rhs.get_tail(rhs_idx)); } + } + } + return result; +} + +} // namespace detail +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/include/cudf/io/text/detail/tile_state.hpp b/cpp/include/cudf/io/text/detail/tile_state.hpp new file mode 100644 index 00000000000..849d857597b --- /dev/null +++ b/cpp/include/cudf/io/text/detail/tile_state.hpp @@ -0,0 +1,134 @@ + +#pragma once + +#include + +#include + +namespace cudf { +namespace io { +namespace text { +namespace detail { + +enum class scan_tile_status : uint8_t { + oob, + invalid, + partial, + inclusive, +}; + +template +struct scan_tile_state_view { + uint64_t num_tiles; + cuda::atomic* tile_status; + T* tile_partial; + T* tile_inclusive; + + __device__ inline void set_status(cudf::size_type tile_idx, scan_tile_status status) + { + auto const offset = (tile_idx + num_tiles) % num_tiles; + tile_status[offset].store(status, cuda::memory_order_relaxed); + } + + __device__ inline void set_partial_prefix(cudf::size_type tile_idx, T value) + { + auto const offset = (tile_idx + num_tiles) % num_tiles; + cub::ThreadStore(tile_partial + offset, value); + tile_status[offset].store(scan_tile_status::partial); + } + + __device__ inline void set_inclusive_prefix(cudf::size_type tile_idx, T value) + { + auto const offset = (tile_idx + num_tiles) % num_tiles; + cub::ThreadStore(tile_inclusive + offset, value); + tile_status[offset].store(scan_tile_status::inclusive); + } + + __device__ inline T get_prefix(cudf::size_type tile_idx, scan_tile_status& status) + { + auto const offset = (tile_idx + num_tiles) % num_tiles; + + while ((status = tile_status[offset].load(cuda::memory_order_relaxed)) == + scan_tile_status::invalid) {} + + if (status == scan_tile_status::partial) { + return cub::ThreadLoad(tile_partial + offset); + } else { + return cub::ThreadLoad(tile_inclusive + offset); + } + } +}; + +template +struct scan_tile_state { + rmm::device_uvector> tile_status; + rmm::device_uvector tile_state_partial; + rmm::device_uvector tile_state_inclusive; + + scan_tile_state(cudf::size_type num_tiles, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) + : tile_status(rmm::device_uvector>( + num_tiles, stream, mr)), + tile_state_partial(rmm::device_uvector(num_tiles, stream, mr)), + tile_state_inclusive(rmm::device_uvector(num_tiles, stream, mr)) + { + } + + operator scan_tile_state_view() + { + return scan_tile_state_view{tile_status.size(), + tile_status.data(), + tile_state_partial.data(), + tile_state_inclusive.data()}; + } + + inline T get_inclusive_prefix(cudf::size_type tile_idx, rmm::cuda_stream_view stream) const + { + auto const offset = (tile_idx + tile_status.size()) % tile_status.size(); + return tile_state_inclusive.element(offset, stream); + } +}; + +template +struct scan_tile_state_callback { + __device__ inline scan_tile_state_callback(scan_tile_state_view& tile_state, + cudf::size_type tile_idx) + : _tile_state(tile_state), _tile_idx(tile_idx) + { + } + + __device__ inline T operator()(T const& block_aggregate) + { + T exclusive_prefix; + + if (threadIdx.x == 0) { + _tile_state.set_partial_prefix(_tile_idx, block_aggregate); + + auto predecessor_idx = _tile_idx - 1; + auto predecessor_status = scan_tile_status::invalid; + + // scan partials to form prefix + + auto window_partial = _tile_state.get_prefix(predecessor_idx, predecessor_status); + while (predecessor_status != scan_tile_status::inclusive) { + predecessor_idx--; + auto predecessor_prefix = _tile_state.get_prefix(predecessor_idx, predecessor_status); + window_partial = predecessor_prefix + window_partial; + } + exclusive_prefix = window_partial; + + _tile_state.set_inclusive_prefix(_tile_idx, exclusive_prefix + block_aggregate); + } + + return exclusive_prefix; + } + + scan_tile_state_view& _tile_state; + cudf::size_type _tile_idx; +}; + +} // namespace detail +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/include/cudf/io/text/detail/trie.hpp b/cpp/include/cudf/io/text/detail/trie.hpp new file mode 100644 index 00000000000..d14fe15b0a9 --- /dev/null +++ b/cpp/include/cudf/io/text/detail/trie.hpp @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace cudf { +namespace io { +namespace text { +namespace detail { + +struct trie_node { + char token; + uint8_t match_length; + uint8_t child_begin; +}; + +struct trie_device_view { + device_span _nodes; + + /** + * @brief create a multistate which contains all partial path matches for the given token. + */ + constexpr multistate transition_init(char c) + { + auto result = multistate(); + + result.enqueue(0, 0); + + for (uint8_t curr = 0; curr < _nodes.size() - 1; curr++) { + transition_enqueue_all(c, result, curr, curr); + } + return result; + } + + /** + * @brief create a new multistate by transitioning all states in the multistate by the given token + * + * Eliminates any partial matches that cannot transition using the given token. + * + * @note always enqueues (0, 0] as the first state of the returned multistate. + */ + constexpr multistate transition(char c, multistate const& states) + { + auto result = multistate(); + + result.enqueue(0, 0); + + for (uint8_t i = 0; i < states.size(); i++) { + transition_enqueue_all(c, result, states.get_head(i), states.get_tail(i)); + } + + return result; + } + + /** + * @brief returns true if the given index is associated with a matching state. + */ + constexpr bool is_match(uint16_t idx) { return static_cast(get_match_length(idx)); } + + /** + * @brief returns the match length if the given index is associated with a matching state, + * otherwise zero. + */ + constexpr uint8_t get_match_length(uint16_t idx) { return _nodes[idx].match_length; } + + /** + * @brief returns the longest matching state of any state in the multistate. + */ + template + constexpr uint8_t get_match_length(multistate const& states) + { + int8_t val = 0; + for (uint8_t i = 0; i < states.size(); i++) { + auto match_length = get_match_length(states.get_tail(i)); + if (match_length > val) { val = match_length; } + } + return val; + } + + private: + constexpr void transition_enqueue_all( // + char c, + multistate& states, + uint8_t head, + uint8_t curr) + { + for (uint32_t tail = _nodes[curr].child_begin; tail < _nodes[curr + 1].child_begin; tail++) { + if (_nodes[tail].token == c) { // + states.enqueue(head, tail); + } + } + } +}; + +/** + * @brief A flat trie contained in device memory. + */ +struct trie { + private: + cudf::size_type _max_duplicate_tokens; + rmm::device_uvector _nodes; + + trie(cudf::size_type max_duplicate_tokens, rmm::device_uvector&& nodes) + : _max_duplicate_tokens(max_duplicate_tokens), _nodes(std::move(nodes)) + { + } + + /** + * @brief Used to build a hierarchical trie which can then be flattened. + */ + struct trie_builder_node { + uint8_t match_length; + std::unordered_map> children; + + /** + * @brief Insert the string in to the trie tree, growing the trie as necessary + */ + void insert(std::string s) { insert(s.c_str(), s.size(), 0); } + + private: + trie_builder_node& insert(char const* s, uint16_t size, uint8_t depth) + { + if (size == 0) { + match_length = depth; + return *this; + } + + if (children[*s] == nullptr) { children[*s] = std::make_unique(); } + + return children[*s]->insert(s + 1, size - 1, depth + 1); + } + }; + + public: + /** + * @brief Gets the number of nodes contained in this trie. + */ + cudf::size_type size() const { return _nodes.size(); } + + /** + * @brief A pessimistic count of duplicate tokens in the trie. Used to determine the maximum + * possible stack size required to compute matches of this trie in parallel. + */ + cudf::size_type max_duplicate_tokens() const { return _max_duplicate_tokens; } + + /** + * @brief Create a trie which represents the given pattern. + * + * @param pattern The pattern to store in the trie + * @param stream The stream to use for allocation and copy + * @param mr Memory resource to use for the device memory allocation + * @return The trie. + */ + static trie create(std::string const& pattern, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) + + { + return create(std::vector{pattern}, stream, mr); + } + + /** + * @brief Create a trie which represents the given pattern. + * + * @param pattern The patterns to store in the trie + * @param stream The stream to use for allocation and copy + * @param mr Memory resource to use for the device memory allocation + * @return The trie. + */ + static trie create(std::vector const& patterns, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) + { + std::vector tokens; + std::vector transitions; + std::vector match_length; + + // create the trie tree + auto root = std::make_unique(); + for (auto& pattern : patterns) { + root->insert(pattern); + } + + // flatten + auto sum = 1; + transitions.emplace_back(sum); + match_length.emplace_back(root->match_length); + + auto builder_nodes = std::queue>(); + builder_nodes.push(std::move(root)); + + tokens.emplace_back(0); + + while (builder_nodes.size()) { + auto layer_size = builder_nodes.size(); + for (uint32_t i = 0; i < layer_size; i++) { + auto node = std::move(builder_nodes.front()); + builder_nodes.pop(); + sum += node->children.size(); + transitions.emplace_back(sum); + for (auto& item : node->children) { + match_length.emplace_back(item.second->match_length); + tokens.emplace_back(item.first); + builder_nodes.push(std::move(item.second)); + } + } + } + + tokens.emplace_back(0); + + match_length.emplace_back(0); + + std::vector trie_nodes; + auto token_counts = std::unordered_map(); + + for (uint32_t i = 0; i < tokens.size(); i++) { + trie_nodes.emplace_back(trie_node{tokens[i], match_length[i], transitions[i]}); + token_counts[tokens[i]]++; + } + + auto most_common_token = + std::max_element(token_counts.begin(), token_counts.end(), [](auto const& a, auto const& b) { + return a.second < b.second; + }); + + auto max_duplicate_tokens = most_common_token->second; + + return trie{max_duplicate_tokens, + cudf::detail::make_device_uvector_sync(trie_nodes, stream, mr)}; + } + + trie_device_view view() const { return trie_device_view{_nodes}; } +}; + +} // namespace detail +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/include/cudf/io/text/multibyte_split.hpp b/cpp/include/cudf/io/text/multibyte_split.hpp new file mode 100644 index 00000000000..d42ee9f510e --- /dev/null +++ b/cpp/include/cudf/io/text/multibyte_split.hpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +#include + +namespace cudf { +namespace io { +namespace text { + +std::unique_ptr multibyte_split( + data_chunk_source const& source, + std::string const& delimiter, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/src/io/text/multibyte_split.cu b/cpp/src/io/text/multibyte_split.cu new file mode 100644 index 00000000000..662ec744680 --- /dev/null +++ b/cpp/src/io/text/multibyte_split.cu @@ -0,0 +1,396 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include + +namespace { + +using cudf::io::text::detail::multistate; + +int32_t constexpr ITEMS_PER_THREAD = 32; +int32_t constexpr THREADS_PER_TILE = 128; +int32_t constexpr ITEMS_PER_TILE = ITEMS_PER_THREAD * THREADS_PER_TILE; +int32_t constexpr TILES_PER_CHUNK = 1024; +int32_t constexpr ITEMS_PER_CHUNK = ITEMS_PER_TILE * TILES_PER_CHUNK; + +struct PatternScan { + using BlockScan = cub::BlockScan; + using BlockScanCallback = cudf::io::text::detail::scan_tile_state_callback; + + struct _TempStorage { + typename BlockScan::TempStorage scan; + }; + + _TempStorage& _temp_storage; + + using TempStorage = cub::Uninitialized<_TempStorage>; + + __device__ inline PatternScan(TempStorage& temp_storage) : _temp_storage(temp_storage.Alias()) {} + + __device__ inline void Scan(cudf::size_type tile_idx, + cudf::io::text::detail::scan_tile_state_view tile_state, + cudf::io::text::detail::trie_device_view trie, + char (&thread_data)[ITEMS_PER_THREAD], + uint32_t (&thread_state)[ITEMS_PER_THREAD]) + { + auto thread_multistate = trie.transition_init(thread_data[0]); + + for (uint32_t i = 1; i < ITEMS_PER_THREAD; i++) { + thread_multistate = trie.transition(thread_data[i], thread_multistate); + } + + auto prefix_callback = BlockScanCallback(tile_state, tile_idx); + + BlockScan(_temp_storage.scan) + .ExclusiveSum(thread_multistate, thread_multistate, prefix_callback); + + for (uint32_t i = 0; i < ITEMS_PER_THREAD; i++) { + thread_multistate = trie.transition(thread_data[i], thread_multistate); + + thread_state[i] = thread_multistate.max_tail(); + } + } +}; + +// multibyte_split works by splitting up inputs in to 32 inputs (bytes) per thread, and transforming +// them in to data structures called "multistates". these multistates are created by searching a +// trie, but instead of a tradition trie where the search begins at a single node at the beginning, +// we allow our search to begin anywhere within the trie tree. The position within the trie tree is +// stored as a "partial match path", which indicates "we can get from here to there by a set of +// specific transitions". By scanning together multistates, we effectively know "we can get here +// from the beginning by following the inputs". By doing this, each thread knows exactly what state +// it begins in. From there, each thread can then take deterministic action. In this case, the +// deterministic action is counting and outputting delimiter offsets when a delimiter is found. + +__global__ void multibyte_split_init_kernel( + cudf::size_type base_tile_idx, + cudf::size_type num_tiles, + cudf::io::text::detail::scan_tile_state_view tile_multistates, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::scan_tile_status status = + cudf::io::text::detail::scan_tile_status::invalid) +{ + auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x; + if (thread_idx < num_tiles) { + auto const tile_idx = base_tile_idx + thread_idx; + tile_multistates.set_status(tile_idx, status); + tile_output_offsets.set_status(tile_idx, status); + } +} + +__global__ void multibyte_split_seed_kernel( + cudf::io::text::detail::scan_tile_state_view tile_multistates, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + multistate tile_multistate_seed, + uint32_t tile_output_offset) +{ + auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x; + if (thread_idx == 0) { + tile_multistates.set_inclusive_prefix(-1, tile_multistate_seed); + tile_output_offsets.set_inclusive_prefix(-1, tile_output_offset); + } +} + +__global__ void multibyte_split_kernel( + cudf::size_type base_tile_idx, + cudf::io::text::detail::scan_tile_state_view tile_multistates, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::trie_device_view trie, + int32_t chunk_input_offset, + cudf::device_span chunk_input_chars, + cudf::device_span abs_output_delimiter_offsets, + cudf::device_span abs_output_chars) +{ + using InputLoad = + cub::BlockLoad; + using OffsetScan = cub::BlockScan; + using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback; + + __shared__ union { + typename InputLoad::TempStorage input_load; + typename PatternScan::TempStorage pattern_scan; + typename OffsetScan::TempStorage offset_scan; + } temp_storage; + + int32_t const tile_idx = base_tile_idx + blockIdx.x; + int32_t const tile_input_offset = blockIdx.x * ITEMS_PER_TILE; + int32_t const thread_input_offset = tile_input_offset + threadIdx.x * ITEMS_PER_THREAD; + int32_t const thread_input_size = chunk_input_chars.size() - thread_input_offset; + + // STEP 1: Load inputs + + char thread_chars[ITEMS_PER_THREAD]; + + InputLoad(temp_storage.input_load) + .Load(chunk_input_chars.data() + tile_input_offset, + thread_chars, + chunk_input_chars.size() - tile_input_offset); + + // STEP 2: Scan inputs to determine absolute thread states + + uint32_t thread_states[ITEMS_PER_THREAD]; + + __syncthreads(); // required before temp_memory re-use + PatternScan(temp_storage.pattern_scan) + .Scan(tile_idx, tile_multistates, trie, thread_chars, thread_states); + + // STEP 3: Flag matches + + uint32_t thread_offsets[ITEMS_PER_THREAD]; + + for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) { + thread_offsets[i] = i < thread_input_size and trie.is_match(thread_states[i]); + } + + // STEP 4: Scan flags to determine absolute thread output offset + + auto prefix_callback = OffsetScanCallback(tile_output_offsets, tile_idx); + + __syncthreads(); // required before temp_memory re-use + OffsetScan(temp_storage.offset_scan) + .ExclusiveSum(thread_offsets, thread_offsets, prefix_callback); + + // Step 5: Assign outputs from each thread using match offsets. + + if (abs_output_chars.size() > 0) { + for (int32_t i = 0; i < ITEMS_PER_THREAD and i < thread_input_size; i++) { + abs_output_chars[chunk_input_offset + thread_input_offset + i] = thread_chars[i]; + } + } + + if (abs_output_delimiter_offsets.size() > 0) { + for (int32_t i = 0; i < ITEMS_PER_THREAD and i < thread_input_size; i++) { + if (trie.is_match(thread_states[i])) { + auto const match_end = base_tile_idx * ITEMS_PER_TILE + thread_input_offset + i + 1; + abs_output_delimiter_offsets[thread_offsets[i]] = match_end; + } + } + } +} + +} // namespace + +namespace cudf { +namespace io { +namespace text { +namespace detail { + +void fork_stream(std::vector streams, rmm::cuda_stream_view stream) +{ + cudaEvent_t event; + cudaEventCreate(&event); + cudaEventRecord(event, stream); + for (uint32_t i = 0; i < streams.size(); i++) { + cudaStreamWaitEvent(streams[i], event, 0); + } + cudaEventDestroy(event); +} + +void join_stream(std::vector streams, rmm::cuda_stream_view stream) +{ + cudaEvent_t event; + cudaEventCreate(&event); + for (uint32_t i = 0; i < streams.size(); i++) { + cudaEventRecord(event, streams[i]); + cudaStreamWaitEvent(stream, event, 0); + } + cudaEventDestroy(event); +} + +std::vector get_streams(int32_t count, rmm::cuda_stream_pool& stream_pool) +{ + auto streams = std::vector(); + for (int32_t i = 0; i < count; i++) { + streams.emplace_back(stream_pool.get_stream()); + } + return streams; +} + +cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_source const& source, + cudf::io::text::detail::trie const& trie, + scan_tile_state& tile_multistates, + scan_tile_state& tile_offsets, + device_span output_buffer, + device_span output_char_buffer, + rmm::cuda_stream_view stream, + std::vector const& streams) +{ + CUDF_FUNC_RANGE(); + cudf::size_type chunk_offset = 0; + + multibyte_split_init_kernel<<>>( // + -TILES_PER_CHUNK, + TILES_PER_CHUNK, + tile_multistates, + tile_offsets, + cudf::io::text::detail::scan_tile_status::oob); + + auto multistate_seed = multistate(); + multistate_seed.enqueue(0, 0); // this represents the first state in the pattern. + + // Seeding the tile state with an identity value allows the 0th tile to follow the same logic as + // the Nth tile, assuming it can look up an inclusive prefix. Without this seed, the 0th block + // would have to follow seperate logic. + multibyte_split_seed_kernel<<<1, 1, 0, stream.value()>>>( // + tile_multistates, + tile_offsets, + multistate_seed, + 0); + + fork_stream(streams, stream); + + auto reader = source.create_reader(); + + cudaEvent_t last_launch_event; + cudaEventCreate(&last_launch_event); + + for (int32_t i = 0; true; i++) { + auto base_tile_idx = i * TILES_PER_CHUNK; + auto chunk_stream = streams[i % streams.size()]; + auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, chunk_stream); + + if (chunk.size() == 0) { break; } + + auto tiles_in_launch = + cudf::util::div_rounding_up_safe(chunk.size(), static_cast(ITEMS_PER_TILE)); + + // reset the next chunk of tile state + multibyte_split_init_kernel<<>>( // + base_tile_idx, + tiles_in_launch, + tile_multistates, + tile_offsets); + + cudaStreamWaitEvent(chunk_stream, last_launch_event, 0); + + multibyte_split_kernel<<>>( // + base_tile_idx, + tile_multistates, + tile_offsets, + trie.view(), + chunk_offset, + chunk, + output_buffer, + output_char_buffer); + + cudaEventRecord(last_launch_event, chunk_stream); + + chunk_offset += chunk.size(); + } + + cudaEventDestroy(last_launch_event); + + join_stream(streams, stream); + + return chunk_offset; +} + +std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source const& source, + std::string const& delimiter, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr, + rmm::cuda_stream_pool& stream_pool) +{ + CUDF_FUNC_RANGE(); + auto const trie = cudf::io::text::detail::trie::create({delimiter}, stream); + + CUDF_EXPECTS(trie.max_duplicate_tokens() < multistate::max_segment_count, + "delimiter contains too many duplicate tokens to produce a deterministic result."); + + CUDF_EXPECTS(trie.size() < multistate::max_segment_value, + "delimiter contains too many total tokens to produce a deterministic result."); + + auto concurrency = 2; + // must be at least 32 when using warp-reduce on partials + // must be at least 1 more than max possible concurrent tiles + // best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s + auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32); + auto tile_multistates = scan_tile_state(num_tile_states, stream); + auto tile_offsets = scan_tile_state(num_tile_states, stream); + + auto streams = get_streams(concurrency, stream_pool); + + auto bytes_total = + multibyte_split_scan_full_source(source, + trie, + tile_multistates, + tile_offsets, + cudf::device_span(static_cast(nullptr), 0), + cudf::device_span(static_cast(nullptr), 0), + stream, + streams); + + // allocate results + auto num_tiles = cudf::util::div_rounding_up_safe(bytes_total, ITEMS_PER_TILE); + auto num_results = tile_offsets.get_inclusive_prefix(num_tiles - 1, stream); + auto string_offsets = rmm::device_uvector(num_results + 2, stream, mr); + auto string_chars = rmm::device_uvector(bytes_total, stream, mr); + + // first and last element are set manually to zero and size of input, respectively. + // kernel is only responsible for determining delimiter offsets + auto string_count = static_cast(string_offsets.size() - 1); + string_offsets.set_element_to_zero_async(0, stream); + string_offsets.set_element_async(string_count, bytes_total, stream); + + multibyte_split_scan_full_source( + source, + trie, + tile_multistates, + tile_offsets, + cudf::device_span(string_offsets).subspan(1, num_results), + string_chars, + stream, + streams); + + return cudf::make_strings_column( + string_count, std::move(string_offsets), std::move(string_chars)); +} + +} // namespace detail + +std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source const& source, + std::string const& delimiter, + rmm::mr::device_memory_resource* mr) +{ + auto stream = rmm::cuda_stream_default; + auto stream_pool = rmm::cuda_stream_pool(2); + auto result = detail::multibyte_split(source, delimiter, stream, mr, stream_pool); + + stream.synchronize(); + + return result; +} + +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/src/strings/strings_column_factories.cu b/cpp/src/strings/strings_column_factories.cu index abf1f9599dc..c89f1b756d6 100644 --- a/cpp/src/strings/strings_column_factories.cu +++ b/cpp/src/strings/strings_column_factories.cu @@ -137,4 +137,46 @@ std::unique_ptr make_strings_column(size_type num_strings, std::move(children)); } +std::unique_ptr make_strings_column(size_type num_strings, + rmm::device_uvector&& offsets, + rmm::device_uvector&& chars, + rmm::device_buffer&& null_mask, + size_type null_count) +{ + CUDF_FUNC_RANGE(); + + auto const offsets_size = static_cast(offsets.size()); + auto const chars_size = static_cast(chars.size()); + + if (null_count > 0) CUDF_EXPECTS(null_mask.size() > 0, "Column with nulls must be nullable."); + + CUDF_EXPECTS(num_strings == offsets_size - 1, "Invalid offsets column size for strings column."); + + auto offsets_column = std::make_unique( // + data_type{type_id::INT32}, + offsets_size, + offsets.release(), + rmm::device_buffer(), + 0); + + auto chars_column = std::make_unique( // + data_type{type_id::INT8}, + chars_size, + chars.release(), + rmm::device_buffer(), + 0); + + auto children = std::vector>(); + + children.emplace_back(std::move(offsets_column)); + children.emplace_back(std::move(chars_column)); + + return std::make_unique(data_type{type_id::STRING}, + num_strings, + rmm::device_buffer{}, + std::move(null_mask), + null_count, + std::move(children)); +} + } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 19421e3115d..edfbba74eb1 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -195,6 +195,7 @@ ConfigureTest(ORC_TEST io/orc_test.cpp) ConfigureTest(PARQUET_TEST io/parquet_test.cpp) ConfigureTest(JSON_TEST io/json_test.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) +ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp) if(CUDF_ENABLE_ARROW_S3) target_compile_definitions(ARROW_IO_SOURCE_TEST PRIVATE "S3_ENABLED") endif() diff --git a/cpp/tests/io/text/multibyte_split_test.cpp b/cpp/tests/io/text/multibyte_split_test.cpp new file mode 100644 index 00000000000..d1fa787e000 --- /dev/null +++ b/cpp/tests/io/text/multibyte_split_test.cpp @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace cudf; +using namespace test; + +// 😀 | F0 9F 98 80 | 11110000 10011111 10011000 10000000 +// 😎 | F0 9F 98 8E | 11110000 10011111 10011000 10001110 + +struct MultibyteSplitTest : public BaseFixture { +}; + +TEST_F(MultibyteSplitTest, NondeterministicMatching) +{ + auto delimiter = std::string("abac"); + auto host_input = std::string("ababacabacab"); + + auto expected = strings_column_wrapper{"ababac", "abac", "ab"}; + + auto source = cudf::io::text::make_source(host_input); + auto out = cudf::io::text::multibyte_split(*source, delimiter); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *out); +} + +TEST_F(MultibyteSplitTest, DelimiterAtEnd) +{ + auto delimiter = std::string(":"); + auto host_input = std::string("abcdefg:"); + + auto expected = strings_column_wrapper{"abcdefg:", ""}; + + auto source = cudf::io::text::make_source(host_input); + auto out = cudf::io::text::multibyte_split(*source, delimiter); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *out); +} + +TEST_F(MultibyteSplitTest, LargeInput) +{ + auto host_input = std::string(); + auto host_expected = std::vector(); + + for (auto i = 0; i < (2 * 32 * 128 * 1024); i++) { + host_input += "...:|"; + host_expected.emplace_back(std::string("...:|")); + } + + host_expected.emplace_back(std::string("")); + + auto expected = strings_column_wrapper{host_expected.begin(), host_expected.end()}; + + auto delimiter = std::string("...:|"); + auto source = cudf::io::text::make_source(host_input); + auto out = cudf::io::text::multibyte_split(*source, delimiter); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *out); +} + +TEST_F(MultibyteSplitTest, OverlappingMatchErasure) +{ + auto delimiter = "::"; + + auto host_input = std::string( + ":::::" + ":::::"); + auto expected = strings_column_wrapper{":::::", ":::::"}; + + auto source = cudf::io::text::make_source(host_input); + auto out = cudf::io::text::multibyte_split(*source, delimiter); + + // CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *out); // this use case it not yet supported. +} + +TEST_F(MultibyteSplitTest, HandpickedInput) +{ + auto delimiters = "::|"; + auto host_input = std::string( + "aaa::|" + "bbb::|" + "ccc::|" + "ddd::|" + "eee::|" + "fff::|" + "ggg::|" + "hhh::|" + "___::|" + "here::|" + "is::|" + "another::|" + "simple::|" + "text::|" + "seperated::|" + "by::|" + "emojis::|" + "which::|" + "are::|" + "multiple::|" + "bytes::|" + "and::|" + "used::|" + "as::|" + "delimiters.::|" + "::|" + "::|" + "::|"); + + auto expected = strings_column_wrapper{ + "aaa::|", "bbb::|", "ccc::|", "ddd::|", "eee::|", "fff::|", + "ggg::|", "hhh::|", "___::|", "here::|", "is::|", "another::|", + "simple::|", "text::|", "seperated::|", "by::|", "emojis::|", "which::|", + "are::|", "multiple::|", "bytes::|", "and::|", "used::|", "as::|", + "delimiters.::|", "::|", "::|", "::|", ""}; + + auto source = cudf::io::text::make_source(host_input); + auto out = cudf::io::text::multibyte_split(*source, delimiters); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *out, debug_output_level::ALL_ERRORS); +}