Skip to content

Commit

Permalink
Draft C++ shared memory IPC workflow and related refactoring / scaffo…
Browse files Browse the repository at this point in the history
…lding /

cleaning.

* Add Google flatbuffers dependency
* Flatbuffers IDL draft in collaboration with Jacques N and Steven P
* Add Schema wrapper in Cython
* Remove unneeded physical layout types from type.h
* Refactor ListType to be a nested type with a single child
* Implement shared memory round-trip for numeric row batches
* mmap-based shared memory interface and MemorySource abstract API
  • Loading branch information
wesm committed Mar 22, 2016
1 parent 5881aac commit 6b88094
Show file tree
Hide file tree
Showing 83 changed files with 2,929 additions and 773 deletions.
7 changes: 6 additions & 1 deletion ci/travis_before_script_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ echo $GTEST_HOME

: ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install}

cmake -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
if [ $TRAVIS_OS_NAME == "linux" ]; then
cmake -DARROW_TEST_MEMCHECK=on -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
else
cmake -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
fi

make -j4
make install

Expand Down
6 changes: 1 addition & 5 deletions ci/travis_script_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ pushd $CPP_BUILD_DIR

make lint

if [ $TRAVIS_OS_NAME == "linux" ]; then
valgrind --tool=memcheck --leak-check=yes --error-exitcode=1 ctest
else
ctest
fi
ctest

popd
82 changes: 57 additions & 25 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,15 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(ARROW_PARQUET
"Build the Parquet adapter and link to libparquet"
OFF)

option(ARROW_TEST_MEMCHECK
"Run the test suite using valgrind --tool=memcheck"
OFF)
option(ARROW_BUILD_TESTS
"Build the Arrow googletest unit tests"
ON)
option(ARROW_IPC
"Build the Arrow IPC extensions"
ON)
endif()

if(NOT ARROW_BUILD_TESTS)
Expand Down Expand Up @@ -278,8 +283,16 @@ function(ADD_ARROW_TEST REL_TEST_NAME)
set(TEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME})
endif()

add_test(${TEST_NAME}
${BUILD_SUPPORT_DIR}/run-test.sh ${TEST_PATH})
if (ARROW_TEST_MEMCHECK)
SET_PROPERTY(TARGET ${TEST_NAME}
APPEND_STRING PROPERTY
COMPILE_FLAGS " -DARROW_VALGRIND")
add_test(${TEST_NAME}
valgrind --tool=memcheck --leak-check=full --error-exitcode=1 ${TEST_PATH})
else()
add_test(${TEST_NAME}
${BUILD_SUPPORT_DIR}/run-test.sh ${TEST_PATH})
endif()
if(ARGN)
set_tests_properties(${TEST_NAME} PROPERTIES ${ARGN})
endif()
Expand Down Expand Up @@ -403,24 +416,10 @@ if (UNIX)
add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py
--verbose=2
--linelength=90
--filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11
`find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h`)
--filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references
`find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'`)
endif (UNIX)

#----------------------------------------------------------------------
# Parquet adapter

if(ARROW_PARQUET)
find_package(Parquet REQUIRED)
include_directories(SYSTEM ${PARQUET_INCLUDE_DIR})
ADD_THIRDPARTY_LIB(parquet
STATIC_LIB ${PARQUET_STATIC_LIB}
SHARED_LIB ${PARQUET_SHARED_LIB})

add_subdirectory(src/arrow/parquet)
list(APPEND LINK_LIBS arrow_parquet parquet)
endif()

############################################################
# Subdirectories
############################################################
Expand All @@ -431,15 +430,18 @@ set(LIBARROW_LINK_LIBS
set(ARROW_SRCS
src/arrow/array.cc
src/arrow/builder.cc
src/arrow/column.cc
src/arrow/schema.cc
src/arrow/table.cc
src/arrow/type.cc

src/arrow/table/column.cc
src/arrow/table/schema.cc
src/arrow/table/table.cc
# IPC / Shared memory library; to be turned into an optional component
src/arrow/ipc/adapter.cc
src/arrow/ipc/memory.cc
src/arrow/ipc/metadata.cc
src/arrow/ipc/metadata-internal.cc

src/arrow/types/construct.cc
src/arrow/types/floating.cc
src/arrow/types/integer.cc
src/arrow/types/json.cc
src/arrow/types/list.cc
src/arrow/types/primitive.cc
Expand Down Expand Up @@ -475,9 +477,39 @@ target_link_libraries(arrow ${LIBARROW_LINK_LIBS})

add_subdirectory(src/arrow)
add_subdirectory(src/arrow/util)
add_subdirectory(src/arrow/table)
add_subdirectory(src/arrow/types)

install(TARGETS arrow
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)

#----------------------------------------------------------------------
# Parquet adapter library

if(ARROW_PARQUET)
find_package(Parquet REQUIRED)
include_directories(SYSTEM ${PARQUET_INCLUDE_DIR})
ADD_THIRDPARTY_LIB(parquet
STATIC_LIB ${PARQUET_STATIC_LIB}
SHARED_LIB ${PARQUET_SHARED_LIB})

add_subdirectory(src/arrow/parquet)
list(APPEND LINK_LIBS arrow_parquet parquet)
endif()

#----------------------------------------------------------------------
# IPC library

## Flatbuffers
if(ARROW_IPC)
find_package(Flatbuffers REQUIRED)
message(STATUS "Flatbuffers include dir: ${FLATBUFFERS_INCLUDE_DIR}")
message(STATUS "Flatbuffers static library: ${FLATBUFFERS_STATIC_LIB}")
message(STATUS "Flatbuffers compiler: ${FLATBUFFERS_COMPILER}")
include_directories(SYSTEM ${FLATBUFFERS_INCLUDE_DIR})
add_library(flatbuffers STATIC IMPORTED)
set_target_properties(flatbuffers PROPERTIES
IMPORTED_LOCATION ${FLATBUFFERS_STATIC_LIB})

add_subdirectory(src/arrow/ipc)
endif()
95 changes: 95 additions & 0 deletions cpp/cmake_modules/FindFlatbuffers.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Tries to find Flatbuffers headers and libraries.
#
# Usage of this module as follows:
#
# find_package(Flatbuffers)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# Flatbuffers_HOME -
# When set, this path is inspected instead of standard library locations as
# the root of the Flatbuffers installation. The environment variable
# FLATBUFFERS_HOME overrides this veriable.
#
# This module defines
# FLATBUFFERS_INCLUDE_DIR, directory containing headers
# FLATBUFFERS_LIBS, directory containing flatbuffers libraries
# FLATBUFFERS_STATIC_LIB, path to libflatbuffers.a
# FLATBUFFERS_FOUND, whether flatbuffers has been found

if( NOT "$ENV{FLATBUFFERS_HOME}" STREQUAL "")
file( TO_CMAKE_PATH "$ENV{FLATBUFFERS_HOME}" _native_path )
list( APPEND _flatbuffers_roots ${_native_path} )
elseif ( Flatbuffers_HOME )
list( APPEND _flatbuffers_roots ${Flatbuffers_HOME} )
endif()

# Try the parameterized roots, if they exist
if ( _flatbuffers_roots )
find_path( FLATBUFFERS_INCLUDE_DIR NAMES flatbuffers/flatbuffers.h
PATHS ${_flatbuffers_roots} NO_DEFAULT_PATH
PATH_SUFFIXES "include" )
find_library( FLATBUFFERS_LIBRARIES NAMES flatbuffers
PATHS ${_flatbuffers_roots} NO_DEFAULT_PATH
PATH_SUFFIXES "lib" )
else ()
find_path( FLATBUFFERS_INCLUDE_DIR NAMES flatbuffers/flatbuffers.h )
find_library( FLATBUFFERS_LIBRARIES NAMES flatbuffers )
endif ()

find_program(FLATBUFFERS_COMPILER flatc
$ENV{FLATBUFFERS_HOME}/bin
/usr/local/bin
/usr/bin
NO_DEFAULT_PATH
)

if (FLATBUFFERS_INCLUDE_DIR AND FLATBUFFERS_LIBRARIES)
set(FLATBUFFERS_FOUND TRUE)
get_filename_component( FLATBUFFERS_LIBS ${FLATBUFFERS_LIBRARIES} PATH )
set(FLATBUFFERS_LIB_NAME libflatbuffers)
set(FLATBUFFERS_STATIC_LIB ${FLATBUFFERS_LIBS}/${FLATBUFFERS_LIB_NAME}.a)
else ()
set(FLATBUFFERS_FOUND FALSE)
endif ()

if (FLATBUFFERS_FOUND)
if (NOT Flatbuffers_FIND_QUIETLY)
message(STATUS "Found the Flatbuffers library: ${FLATBUFFERS_LIBRARIES}")
endif ()
else ()
if (NOT Flatbuffers_FIND_QUIETLY)
set(FLATBUFFERS_ERR_MSG "Could not find the Flatbuffers library. Looked in ")
if ( _flatbuffers_roots )
set(FLATBUFFERS_ERR_MSG "${FLATBUFFERS_ERR_MSG} in ${_flatbuffers_roots}.")
else ()
set(FLATBUFFERS_ERR_MSG "${FLATBUFFERS_ERR_MSG} system search paths.")
endif ()
if (Flatbuffers_FIND_REQUIRED)
message(FATAL_ERROR "${FLATBUFFERS_ERR_MSG}")
else (Flatbuffers_FIND_REQUIRED)
message(STATUS "${FLATBUFFERS_ERR_MSG}")
endif (Flatbuffers_FIND_REQUIRED)
endif ()
endif ()

mark_as_advanced(
FLATBUFFERS_INCLUDE_DIR
FLATBUFFERS_LIBS
FLATBUFFERS_STATIC_LIB
FLATBUFFERS_COMPILER
)
1 change: 1 addition & 0 deletions cpp/setup_build_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ SOURCE_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
source thirdparty/versions.sh

export GTEST_HOME=$SOURCE_DIR/thirdparty/$GTEST_BASEDIR
export FLATBUFFERS_HOME=$SOURCE_DIR/thirdparty/installed

echo "Build env initialized"
6 changes: 6 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
install(FILES
api.h
array.h
column.h
builder.h
schema.h
table.h
type.h
DESTINATION include/arrow)

Expand All @@ -30,3 +33,6 @@ install(FILES
set(ARROW_TEST_LINK_LIBS ${ARROW_MIN_TEST_LIBS})

ADD_ARROW_TEST(array-test)
ADD_ARROW_TEST(column-test)
ADD_ARROW_TEST(schema-test)
ADD_ARROW_TEST(table-test)
11 changes: 5 additions & 6 deletions cpp/src/arrow/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@

#include "arrow/array.h"
#include "arrow/builder.h"
#include "arrow/column.h"
#include "arrow/schema.h"
#include "arrow/table.h"
#include "arrow/type.h"

#include "arrow/table/column.h"
#include "arrow/table/schema.h"
#include "arrow/table/table.h"

#include "arrow/types/boolean.h"
#include "arrow/types/construct.h"
#include "arrow/types/floating.h"
#include "arrow/types/integer.h"
#include "arrow/types/list.h"
#include "arrow/types/primitive.h"
#include "arrow/types/string.h"
#include "arrow/types/struct.h"

#include "arrow/util/buffer.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

Expand Down
6 changes: 2 additions & 4 deletions cpp/src/arrow/array-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
#include "arrow/array.h"
#include "arrow/test-util.h"
#include "arrow/type.h"
#include "arrow/types/integer.h"
#include "arrow/types/primitive.h"
#include "arrow/util/buffer.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

namespace arrow {

Expand All @@ -38,7 +36,7 @@ static TypePtr int32 = TypePtr(new Int32Type());
class TestArray : public ::testing::Test {
public:
void SetUp() {
pool_ = GetDefaultMemoryPool();
pool_ = default_memory_pool();
}

protected:
Expand Down Expand Up @@ -75,7 +73,7 @@ TEST_F(TestArray, TestIsNull) {
if (x > 0) ++null_count;
}

std::shared_ptr<Buffer> null_buf = bytes_to_null_buffer(nulls.data(),
std::shared_ptr<Buffer> null_buf = test::bytes_to_null_buffer(nulls.data(),
nulls.size());
std::unique_ptr<Array> arr;
arr.reset(new Array(int32, nulls.size(), null_count, null_buf));
Expand Down
25 changes: 20 additions & 5 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ namespace arrow {

Array::Array(const TypePtr& type, int32_t length, int32_t null_count,
const std::shared_ptr<Buffer>& nulls) {
Init(type, length, null_count, nulls);
}

void Array::Init(const TypePtr& type, int32_t length, int32_t null_count,
const std::shared_ptr<Buffer>& nulls) {
type_ = type;
length_ = length;
null_count_ = null_count;
Expand All @@ -42,4 +37,24 @@ void Array::Init(const TypePtr& type, int32_t length, int32_t null_count,
}
}

bool Array::Equals(const Array& other) const {
if (this == &other) return true;
if (null_count_ != other.null_count_) {
return false;
}
if (null_count_ > 0) {
return nulls_->Equals(*other.nulls_, util::bytes_for_bits(length_));
} else {
return true;
}
}

bool Array::Equals(const std::shared_ptr<Array>& arr) const {
if (this == arr.get()) return true;
if (this->type_enum() != arr->type_enum()) {
return false;
}
return Equals(*static_cast<const Array*>(arr.get()));
}

} // namespace arrow
Loading

0 comments on commit 6b88094

Please sign in to comment.