Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Arrow URI FileSystem backed instance to retrieve remote files #7709

Merged
merged 17 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/cmake/thirdparty/CUDF_GetArrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ function(find_and_configure_arrow VERSION BUILD_STATIC)
"ARROW_WITH_BACKTRACE ON"
"ARROW_CXXFLAGS -w"
"ARROW_JEMALLOC OFF"
"ARROW_S3 ON"
# Arrow modifies CMake's GLOBAL RULE_LAUNCH_COMPILE unless this is off
"ARROW_USE_CCACHE OFF"
"ARROW_ARMV8_ARCH ${ARROW_ARMV8_ARCH}"
Expand Down
33 changes: 33 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
#include <rmm/cuda_stream_view.hpp>

#include <arrow/buffer.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/s3fs.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
#include <arrow/io/memory.h>
#include <arrow/result.h>
#include <arrow/status.h>

#include <memory>

Expand Down Expand Up @@ -302,6 +306,34 @@ class arrow_io_source : public datasource {
};

public:
/**
* @brief Constructs an object from an Apache Arrow Filesystem URI
*
* @param Apache Arrow Filesystem URI
*/
explicit arrow_io_source(std::string_view arrow_uri)
{
const std::string uri_start_delimiter = "//";
const std::string uri_end_delimiter = "?";

arrow::Result<std::shared_ptr<arrow::fs::FileSystem>> result =
arrow::fs::FileSystemFromUri(static_cast<std::string>(arrow_uri));
CUDF_EXPECTS(result.ok(), "Failed to generate Arrow Filesystem instance from URI.");
filesystem = result.ValueOrDie();

// Parse the path from the URI
size_t start = arrow_uri.find(uri_start_delimiter) == std::string::npos
? 0
: arrow_uri.find(uri_start_delimiter) + uri_start_delimiter.size();
size_t end = arrow_uri.find(uri_end_delimiter) - start;
std::string_view path = arrow_uri.substr(start, end);

arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> in_stream =
filesystem->OpenInputFile(static_cast<std::string>(path).c_str());
CUDF_EXPECTS(in_stream.ok(), "Failed to open Arrow RandomAccessFile");
arrow_file = in_stream.ValueOrDie();
}

/**
* @brief Constructs an object from an `arrow` source object.
*
Expand Down Expand Up @@ -340,6 +372,7 @@ class arrow_io_source : public datasource {
}

private:
std::shared_ptr<arrow::fs::FileSystem> filesystem;
jrhemstad marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<arrow::io::RandomAccessFile> arrow_file;
};

Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ ConfigureTest(CSV_TEST io/csv_test.cpp)
ConfigureTest(ORC_TEST io/orc_test.cpp)
ConfigureTest(PARQUET_TEST io/parquet_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp)
ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)

###################################################################################################
# - sort tests ------------------------------------------------------------------------------------
Expand Down
84 changes: 84 additions & 0 deletions cpp/tests/io/arrow_io_source_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/table_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/io/datasource.hpp>
#include <cudf/io/json.hpp>
#include <cudf/io/parquet.hpp>

#include <arrow/io/api.h>

#include <fstream>
#include <memory>
#include <string>

// Global environment for temporary files
auto const temp_env = static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

// Base test fixture for tests
struct ArrowIOTest : public cudf::test::BaseFixture {
};

TEST_F(ArrowIOTest, URIFileSystem)
{
const std::string file_name = temp_env->get_temp_dir() + "JsonLinesFileTest.json";
std::ofstream outfile(file_name, std::ofstream::out);
outfile << "[11, 1.1]\n[22, 2.2]";
outfile.close();

std::string file_uri = "file://" + file_name;
std::unique_ptr<cudf::io::arrow_io_source> datasource =
std::make_unique<cudf::io::arrow_io_source>(file_uri);

// Populate the JSON Reader Options
cudf::io::json_reader_options options =
cudf::io::json_reader_options::builder(cudf::io::source_info(datasource.get())).lines(true);

// Read the JSON file from the LocalFileSystem
cudf::io::table_with_metadata tbl = cudf::io::read_json(options);

ASSERT_EQ(2, tbl.tbl->num_columns());
ASSERT_EQ(2, tbl.tbl->num_rows());
}

TEST_F(ArrowIOTest, S3FileSystem)
{
std::string s3_uri = "s3://rapidsai-data/cudf/test/tips.parquet?region=us-east-2";
std::unique_ptr<cudf::io::arrow_io_source> datasource =
std::make_unique<cudf::io::arrow_io_source>(s3_uri);

// Populate the Parquet Reader Options
cudf::io::source_info src(datasource.get());
std::vector<std::string> single_column;
single_column.insert(single_column.begin(), "total_bill");
cudf::io::parquet_reader_options_builder builder(src);
cudf::io::parquet_reader_options options = builder.columns(single_column).build();

// Read the Parquet file from S3
cudf::io::table_with_metadata tbl = cudf::io::read_parquet(options);

ASSERT_EQ(1, tbl.tbl->num_columns()); // Only single column specified in reader_options
ASSERT_EQ(244, tbl.tbl->num_rows()); // known number of rows from the S3 file
}

CUDF_TEST_PROGRAM_MAIN()