Skip to content

Commit

Permalink
s3 filesystem bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Sep 24, 2019
1 parent c6faaed commit 5200af1
Show file tree
Hide file tree
Showing 24 changed files with 345 additions and 230 deletions.
6 changes: 6 additions & 0 deletions cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -2535,6 +2535,12 @@ if(ARROW_S3)
include_directories(SYSTEM ${AWSSDK_INCLUDE_DIR})
message(STATUS "Found AWS SDK headers: ${AWSSDK_INCLUDE_DIR}")
message(STATUS "Found AWS SDK libraries: ${AWSSDK_LINK_LIBRARIES}")

if(APPLE)
set_target_properties(AWS::aws-c-common PROPERTIES
INTERFACE_LINK_LIBRARIES "-pthread;pthread;-framework CoreFoundation"
)
endif()
endif()

# Write out the package configurations.
Expand Down
4 changes: 4 additions & 0 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ set(CYTHON_EXTENSIONS lib _fs _csv _json)

set(LINK_LIBS arrow_shared arrow_python_shared)

if(PYARROW_BUILD_S3)
set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _s3)
endif()

if(PYARROW_BUILD_CUDA)
# Arrow CUDA
find_package(ArrowCuda)
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ cdef class ConvertOptions:
self.options.include_missing_columns = value


cdef _get_reader(input_file, shared_ptr[InputStream]* out):
cdef _get_reader(input_file, shared_ptr[CInputStream]* out):
use_memory_map = False
get_input_stream(input_file, use_memory_map, out)

Expand Down Expand Up @@ -522,7 +522,7 @@ def read_csv(input_file, read_options=None, parse_options=None,
Contents of the CSV file as a in-memory table.
"""
cdef:
shared_ptr[InputStream] stream
shared_ptr[CInputStream] stream
CCSVReadOptions c_read_options
CCSVParseOptions c_parse_options
CCSVConvertOptions c_convert_options
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_cuda.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ cdef class BufferReader(NativeFile):
self.buffer = obj
self.reader = new CCudaBufferReader(self.buffer.buffer)
self.set_random_access_file(
shared_ptr[RandomAccessFile](self.reader))
shared_ptr[CRandomAccessFile](self.reader))
self.is_readable = True

def read_buffer(self, nbytes=None):
Expand Down Expand Up @@ -776,7 +776,7 @@ cdef class BufferWriter(NativeFile):
def __cinit__(self, CudaBuffer buffer):
self.buffer = buffer
self.writer = new CCudaBufferWriter(self.buffer.cuda_buffer)
self.set_output_stream(shared_ptr[OutputStream](self.writer))
self.set_output_stream(shared_ptr[COutputStream](self.writer))
self.is_writable = True

def writeat(self, int64_t position, object data):
Expand Down
13 changes: 2 additions & 11 deletions python/pyarrow/_fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,8 @@ import six
from pyarrow.compat import frombytes, tobytes
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport PyDateTime_from_TimePoint
from pyarrow.includes.libarrow_fs cimport *
from pyarrow.util import _stringify_path
from pyarrow.lib import _detect_compression
from pyarrow.lib cimport (
check_status,
NativeFile,
BufferedOutputStream,
BufferedInputStream,
CompressedInputStream,
CompressedOutputStream
)
from pyarrow.lib cimport *


cdef inline c_string _path_as_bytes(path) except *:
Expand Down Expand Up @@ -523,4 +514,4 @@ cdef class SubTreeFileSystem(FileSystem):

cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.subtreefs = <CSubTreeFileSystem*> wrapped.get()
self.subtreefs = <CSubTreeFileSystem*> wrapped.get()
4 changes: 2 additions & 2 deletions python/pyarrow/_json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ cdef class ParseOptions:
self.options.newlines_in_values = value


cdef _get_reader(input_file, shared_ptr[InputStream]* out):
cdef _get_reader(input_file, shared_ptr[CInputStream]* out):
use_memory_map = False
get_input_stream(input_file, use_memory_map, out)

Expand Down Expand Up @@ -175,7 +175,7 @@ def read_json(input_file, read_options=None, parse_options=None,
Contents of the JSON file as a in-memory table.
"""
cdef:
shared_ptr[InputStream] stream
shared_ptr[CInputStream] stream
CJSONReadOptions c_read_options
CJSONParseOptions c_parse_options
shared_ptr[CJSONReader] reader
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus,
CKeyValueMetadata,
CRecordBatch,
CTable,
RandomAccessFile, OutputStream,
RandomAccessFile, COutputStream,
TimeUnit)


Expand All @@ -37,7 +37,7 @@ cdef extern from "arrow/adapters/orc/adapter.h" \

cdef cppclass ORCFileReader:
@staticmethod
CStatus Open(const shared_ptr[RandomAccessFile]& file,
CStatus Open(const shared_ptr[CRandomAccessFile]& file,
CMemoryPool* pool,
unique_ptr[ORCFileReader]* reader)

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/_orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ cdef class ORCReader:

def open(self, object source, c_bool use_memory_map=True):
cdef:
shared_ptr[RandomAccessFile] rd_handle
shared_ptr[CRandomAccessFile] rd_handle

self.source = source

Expand Down
10 changes: 5 additions & 5 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CChunkedArray, CSchema, CStatus,
CTable, CMemoryPool, CBuffer,
CKeyValueMetadata,
RandomAccessFile, OutputStream,
CRandomAccessFile, COutputStream,
TimeUnit)


Expand Down Expand Up @@ -316,7 +316,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
unique_ptr[CRowGroupMetaData] RowGroup(int i)
const SchemaDescriptor* schema()
shared_ptr[const CKeyValueMetadata] key_value_metadata() const
void WriteTo(OutputStream* dst) const
void WriteTo(COutputStream* dst) const

cdef shared_ptr[CFileMetaData] CFileMetaData_Make \
" parquet::FileMetaData::Make"(const void* serialized_metadata,
Expand Down Expand Up @@ -406,7 +406,7 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:

cdef cppclass FileReaderBuilder:
FileReaderBuilder()
CStatus Open(const shared_ptr[RandomAccessFile]& file,
CStatus Open(const shared_ptr[CRandomAccessFile]& file,
const CReaderProperties& properties,
const shared_ptr[CFileMetaData]& metadata)

Expand Down Expand Up @@ -435,7 +435,7 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:

@staticmethod
CStatus Open(const CSchema& schema, CMemoryPool* pool,
const shared_ptr[OutputStream]& sink,
const shared_ptr[COutputStream]& sink,
const shared_ptr[WriterProperties]& properties,
const shared_ptr[ArrowWriterProperties]& arrow_properties,
unique_ptr[FileWriter]* writer)
Expand All @@ -448,4 +448,4 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:

CStatus WriteMetaDataFile(
const CFileMetaData& file_metadata,
const OutputStream* sink)
const COutputStream* sink)
8 changes: 4 additions & 4 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ cdef class FileMetaData:
def __reduce__(self):
cdef:
NativeFile sink = BufferOutputStream()
OutputStream* c_sink = sink.get_output_stream().get()
COutputStream* c_sink = sink.get_output_stream().get()
with nogil:
self._metadata.WriteTo(c_sink)

Expand Down Expand Up @@ -694,7 +694,7 @@ cdef class FileMetaData:
Write the metadata object to a metadata-only file
"""
cdef:
shared_ptr[OutputStream] sink
shared_ptr[COutputStream] sink
c_string c_where

try:
Expand Down Expand Up @@ -1010,7 +1010,7 @@ cdef class ParquetReader:
read_dictionary=None, FileMetaData metadata=None,
int buffer_size=0):
cdef:
shared_ptr[RandomAccessFile] rd_handle
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
CReaderProperties properties = default_reader_properties()
ArrowReaderProperties arrow_props = (
Expand Down Expand Up @@ -1202,7 +1202,7 @@ cdef class ParquetReader:
cdef class ParquetWriter:
cdef:
unique_ptr[FileWriter] writer
shared_ptr[OutputStream] sink
shared_ptr[COutputStream] sink
bint own_sink

cdef readonly:
Expand Down
52 changes: 52 additions & 0 deletions python/pyarrow/_s3.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

import six

from pyarrow.compat import frombytes, tobytes
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport PyDateTime_from_TimePoint
from pyarrow.lib import _detect_compression
from pyarrow.lib cimport *


cdef class S3FileSystem(FileSystem):

cdef:
CS3FileSystem* s3fs

def __init__(self, str access_key, str secret_key, str region='us-east-1',
str scheme='https', str endpoint_override=None):
cdef:
CS3Options options
shared_ptr[CS3FileSystem] wrapped

options.access_key = tobytes(access_key)
options.secret_key = tobytes(secret_key)
options.region = tobytes(region)
options.scheme = tobytes(scheme)
if endpoint_override is not None:
options.endpoint_override = endpoint_override

check_status(CS3FileSystem.Make(options, &wrapped))
self.init(<shared_ptr[CFileSystem]> wrapped)

cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.s3fs = <CS3FileSystem*> wrapped.get()
4 changes: 2 additions & 2 deletions python/pyarrow/feather.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ cdef class FeatherWriter:
self.num_rows = -1

def open(self, object dest):
cdef shared_ptr[OutputStream] sink
cdef shared_ptr[COutputStream] sink
get_writer(dest, &sink)

with nogil:
Expand Down Expand Up @@ -76,7 +76,7 @@ cdef class FeatherReader:
pass

def open(self, source, c_bool use_memory_map=True):
cdef shared_ptr[RandomAccessFile] reader
cdef shared_ptr[CRandomAccessFile] reader
get_reader(source, use_memory_map, &reader)

with nogil:
Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@
from __future__ import absolute_import

from pyarrow._fs import * # noqa

try:
from pyarrow._s3 import * # noqa
except ImportError:
pass
Loading

0 comments on commit 5200af1

Please sign in to comment.