Skip to content

Commit

Permalink
Use Arrow URI FileSystem backed instance to retrieve remote files (#7709
Browse files Browse the repository at this point in the history
)

Arrow offers an API that allows for users to provide a uri definition for target files. This PR will use that api and create a new `arrow_io_source` constructor to accept that information from the user and then create the appropriate FileSystem instance and configure it for access to that file.

This closes: #7475

Authors:
  - Jeremy Dyer (https://github.com/jdye64)

Approvers:
  - Jake Hemstad (https://github.com/jrhemstad)
  - Robert Maynard (https://github.com/robertmaynard)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #7709
  • Loading branch information
jdye64 authored Jun 14, 2021
1 parent 26f5671 commit 6845e96
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
1 change: 1 addition & 0 deletions cpp/cmake/thirdparty/CUDF_GetArrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ function(find_and_configure_arrow VERSION BUILD_STATIC)
"ARROW_WITH_BACKTRACE ON"
"ARROW_CXXFLAGS -w"
"ARROW_JEMALLOC OFF"
"ARROW_S3 ON"
# Arrow modifies CMake's GLOBAL RULE_LAUNCH_COMPILE unless this is off
"ARROW_USE_CCACHE OFF"
"ARROW_ARMV8_ARCH ${ARROW_ARMV8_ARCH}"
Expand Down
33 changes: 33 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
#include <rmm/cuda_stream_view.hpp>

#include <arrow/buffer.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/s3fs.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
#include <arrow/io/memory.h>
#include <arrow/result.h>
#include <arrow/status.h>

#include <memory>

Expand Down Expand Up @@ -302,6 +306,34 @@ class arrow_io_source : public datasource {
};

public:
/**
* @brief Constructs an object from an Apache Arrow Filesystem URI
*
* @param Apache Arrow Filesystem URI
*/
explicit arrow_io_source(std::string_view arrow_uri)
{
const std::string uri_start_delimiter = "//";
const std::string uri_end_delimiter = "?";

arrow::Result<std::shared_ptr<arrow::fs::FileSystem>> result =
arrow::fs::FileSystemFromUri(static_cast<std::string>(arrow_uri));
CUDF_EXPECTS(result.ok(), "Failed to generate Arrow Filesystem instance from URI.");
filesystem = result.ValueOrDie();

// Parse the path from the URI
size_t start = arrow_uri.find(uri_start_delimiter) == std::string::npos
? 0
: arrow_uri.find(uri_start_delimiter) + uri_start_delimiter.size();
size_t end = arrow_uri.find(uri_end_delimiter) - start;
std::string_view path = arrow_uri.substr(start, end);

arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> in_stream =
filesystem->OpenInputFile(static_cast<std::string>(path).c_str());
CUDF_EXPECTS(in_stream.ok(), "Failed to open Arrow RandomAccessFile");
arrow_file = in_stream.ValueOrDie();
}

/**
* @brief Constructs an object from an `arrow` source object.
*
Expand Down Expand Up @@ -340,6 +372,7 @@ class arrow_io_source : public datasource {
}

private:
std::shared_ptr<arrow::fs::FileSystem> filesystem;
std::shared_ptr<arrow::io::RandomAccessFile> arrow_file;
};

Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ ConfigureTest(CSV_TEST io/csv_test.cpp)
ConfigureTest(ORC_TEST io/orc_test.cpp)
ConfigureTest(PARQUET_TEST io/parquet_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp)
ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)

###################################################################################################
# - sort tests ------------------------------------------------------------------------------------
Expand Down
84 changes: 84 additions & 0 deletions cpp/tests/io/arrow_io_source_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/table_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/io/datasource.hpp>
#include <cudf/io/json.hpp>
#include <cudf/io/parquet.hpp>

#include <arrow/io/api.h>

#include <fstream>
#include <memory>
#include <string>

// Global environment for temporary files
auto const temp_env = static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

// Base test fixture for tests
struct ArrowIOTest : public cudf::test::BaseFixture {
};

TEST_F(ArrowIOTest, URIFileSystem)
{
const std::string file_name = temp_env->get_temp_dir() + "JsonLinesFileTest.json";
std::ofstream outfile(file_name, std::ofstream::out);
outfile << "[11, 1.1]\n[22, 2.2]";
outfile.close();

std::string file_uri = "file://" + file_name;
std::unique_ptr<cudf::io::arrow_io_source> datasource =
std::make_unique<cudf::io::arrow_io_source>(file_uri);

// Populate the JSON Reader Options
cudf::io::json_reader_options options =
cudf::io::json_reader_options::builder(cudf::io::source_info(datasource.get())).lines(true);

// Read the JSON file from the LocalFileSystem
cudf::io::table_with_metadata tbl = cudf::io::read_json(options);

ASSERT_EQ(2, tbl.tbl->num_columns());
ASSERT_EQ(2, tbl.tbl->num_rows());
}

TEST_F(ArrowIOTest, S3FileSystem)
{
std::string s3_uri = "s3://rapidsai-data/cudf/test/tips.parquet?region=us-east-2";
std::unique_ptr<cudf::io::arrow_io_source> datasource =
std::make_unique<cudf::io::arrow_io_source>(s3_uri);

// Populate the Parquet Reader Options
cudf::io::source_info src(datasource.get());
std::vector<std::string> single_column;
single_column.insert(single_column.begin(), "total_bill");
cudf::io::parquet_reader_options_builder builder(src);
cudf::io::parquet_reader_options options = builder.columns(single_column).build();

// Read the Parquet file from S3
cudf::io::table_with_metadata tbl = cudf::io::read_parquet(options);

ASSERT_EQ(1, tbl.tbl->num_columns()); // Only single column specified in reader_options
ASSERT_EQ(244, tbl.tbl->num_rows()); // known number of rows from the S3 file
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit 6845e96

Please sign in to comment.