diff --git a/CMakeLists.txt b/CMakeLists.txt index b7c746b79..bc9ede4e5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -134,10 +134,12 @@ endif() if("${NEBULA_COMMON_REPO_URL}" STREQUAL "") SET(NEBULA_COMMON_REPO_URL "git@github.com:vesoft-inc-private/nebula-common.git") endif() +message(STATUS "NEBULA_COMMON_REPO_URL: " ${NEBULA_COMMON_REPO_URL}) if("${NEBULA_COMMON_REPO_TAG}" STREQUAL "") SET(NEBULA_COMMON_REPO_TAG "HEAD") endif() +message(STATUS "NEBULA_COMMON_REPO_TAG: " ${NEBULA_COMMON_REPO_TAG}) SET(NEBULA_COMMON_PACKAGE "nebula-common") @@ -154,6 +156,7 @@ add_dependent_project( find_package(${NEBULA_COMMON_PACKAGE} REQUIRED) message(STATUS ">>>>> The nebula-common repo has been configured successfully <<<<<") message(STATUS "") +include_directories(AFTER modules/common/src) # third-party if(NOT ${NEBULA_THIRDPARTY_ROOT} STREQUAL "") @@ -189,7 +192,30 @@ message(STATUS "CMAKE_INCLUDE_PATH: " ${INCLUDE_PATH_STR}) message(STATUS "CMAKE_LIBRARY_PATH: " ${LIBRARY_PATH_STR}) message(STATUS "CMAKE_PROGRAM_PATH: " ${PROGRAM_PATH_STR}) +find_package(Bzip2 REQUIRED) +find_package(DoubleConversion REQUIRED) +find_package(Fbthrift REQUIRED) +find_package(Folly REQUIRED) +find_package(Gflags REQUIRED) +find_package(Glog REQUIRED) +find_package(Googletest REQUIRED) +find_package(Jemalloc REQUIRED) +find_package(Libevent REQUIRED) +find_package(Mstch REQUIRED) +find_package(Proxygen REQUIRED) +find_package(Snappy REQUIRED) +find_package(Wangle REQUIRED) +find_package(ZLIB REQUIRED) +find_package(Zstd REQUIRED) +find_package(Boost REQUIRED) +find_package(OpenSSL REQUIRED) +find_package(Krb5 REQUIRED gssapi) +find_package(GPERF 2.8 REQUIRED) +find_package(Libunwind REQUIRED) +find_package(LibLZMA MODULE) + find_package(Rocksdb REQUIRED) +find_package(nebula-common REQUIRED) add_compile_options(-Wall) add_compile_options(-Werror) @@ -316,8 +342,6 @@ macro(nebula_add_library name type) endmacro() include_directories(AFTER ${NEBULA_HOME}/src) -include_directories(AFTER modules/common/src) -include_directories(AFTER modules/common/src/interface) include_directories(AFTER ${CMAKE_CURRENT_BINARY_DIR}/src) include_directories(AFTER ${CMAKE_CURRENT_BINARY_DIR}/src/kvstore/plugins) include_directories(AFTER ${CMAKE_CURRENT_BINARY_DIR}/src/kvstore/plugins/hbase) @@ -385,11 +409,11 @@ macro(nebula_link_libraries target) double-conversion resolv s2 + ${COMPRESSION_LIBRARIES} + ${JEMALLOC_LIB} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} ${KRB5_LIBRARIES} - ${COMPRESSION_LIBRARIES} - ${JEMALLOC_LIB} ${LIBUNWIND_LIBRARIES} dl ${GETTIME_LIB} @@ -404,7 +428,7 @@ function(nebula_add_subdirectory dir_name) add_subdirectory(${dir_name}) endfunction() -#add_subdirectory(src) +add_subdirectory(src) add_subdirectory(conf) add_subdirectory(scripts) diff --git a/cmake/FindBzip2.cmake b/cmake/FindBzip2.cmake new file mode 100644 index 000000000..70f944a3c --- /dev/null +++ b/cmake/FindBzip2.cmake @@ -0,0 +1,34 @@ +# - Try to find Bzip2 includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Bzip2) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Bzip2_FOUND System has bzip2, include and lib dirs found. +# Bzip2_INCLUDE_DIR The bzip2 includes directories. +# Bzip2_LIBRARY The bzip2 library. +# Bzip2_BIN The bzip2 binary. + +find_path(Bzip2_INCLUDE_DIR NAMES bzlib.h) +find_library(Bzip2_LIBRARY NAMES libbz2.a) +find_program(Bzip2_BIN NAMES bzip2) + +if(Bzip2_INCLUDE_DIR AND Bzip2_LIBRARY AND Bzip2_BIN) + set(Bzip2_FOUND TRUE) + mark_as_advanced( + Bzip2_INCLUDE_DIR + Bzip2_LIBRARY + Bzip2_BIN + ) +endif() + +if(NOT Bzip2_FOUND) + message(FATAL_ERROR "Bzip2 doesn't exist") +endif() + + diff --git a/cmake/FindDoubleConversion.cmake b/cmake/FindDoubleConversion.cmake new file mode 100644 index 000000000..3580d889a --- /dev/null +++ b/cmake/FindDoubleConversion.cmake @@ -0,0 +1,30 @@ +# - Try to find libdouble-conversion includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(DoubleConversion) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# DoubleConversion_FOUND System has double-conversion, include and lib dirs found +# DoubleConversion_INCLUDE_DIR The double-conversion includes directories. +# DoubleConversion_LIBRARY The double-conversion library. + +find_path(DoubleConversion_INCLUDE_DIR NAMES double-conversion.h) +find_library(DoubleConversion_LIBRARY NAMES libdouble-conversion.a) + +if(DoubleConversion_INCLUDE_DIR AND DoubleConversion_LIBRARY) + set(DoubleConversion_FOUND TRUE) + mark_as_advanced( + DoubleConversion_INCLUDE_DIR + DoubleConversion_LIBRARY + ) +endif() + +if(NOT DoubleConversion_FOUND) + message(FATAL_ERROR "Double-conversion doesn't exist") +endif() + diff --git a/cmake/FindFbthrift.cmake b/cmake/FindFbthrift.cmake new file mode 100644 index 000000000..496b8a350 --- /dev/null +++ b/cmake/FindFbthrift.cmake @@ -0,0 +1,33 @@ +# - Try to find Fbthrift includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Fbthrift) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Fbthrift_FOUND System has fbthrift, thrift1 and include and lib dirs found. +# Fbthrift_INCLUDE_DIR The fbthrift includes directories. +# Fbthrift_LIBRARY The fbthrift library. +# Fbthrift_BIN The fbthrift binary. + +find_path(Fbthrift_INCLUDE_DIR NAMES thrift) +find_library(Fbthrift_LIBRARY NAMES libthrift.a) +find_program(Fbthrift_BIN NAMES thrift1) + +if(Fbthrift_INCLUDE_DIR AND Fbthrift_LIBRARY AND Fbthrift_BIN) + set(Fbthrift_FOUND TRUE) + mark_as_advanced( + Fbthrift_INCLUDE_DIR + Fbthrift_LIBRARY + Fbthrift_BIN + ) +endif() + +if(NOT Fbthrift_FOUND) + message(FATAL_ERROR "Fbthrift doesn't exist") +endif() + diff --git a/cmake/FindFolly.cmake b/cmake/FindFolly.cmake new file mode 100644 index 000000000..ba4e052ba --- /dev/null +++ b/cmake/FindFolly.cmake @@ -0,0 +1,30 @@ +# - Try to find Folly includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Folly) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Folly_FOUND System has folly, include and lib dirs found +# Folly_INCLUDE_DIR The folly includes directories. +# Folly_LIBRARY The folly library. + +find_path(Folly_INCLUDE_DIR NAMES folly) +find_library(Folly_LIBRARY NAMES libfolly.a libfollybenchmark.a) + +if(Folly_INCLUDE_DIR AND Folly_LIBRARY) + set(Folly_FOUND TRUE) + mark_as_advanced( + Folly_INCLUDE_DIR + Folly_LIBRARY + ) +endif() + +if(NOT Folly_FOUND) + message(FATAL_ERROR "Folly doesn't exist") +endif() + diff --git a/cmake/FindGPERF.cmake b/cmake/FindGPERF.cmake new file mode 100644 index 000000000..e2e99cbf4 --- /dev/null +++ b/cmake/FindGPERF.cmake @@ -0,0 +1,61 @@ +# FindGPERF +# --------- +# +# Find ``gperf`` executable +# +# The module defines the following variables: +# +# ``GPERF_EXECUTABLE_DIR`` +# path to search the gperf binary +# +# ``GPERF_EXECUTABLE`` +# path to the ``gperf`` program +# +# ``GPERF_BIN_DIR`` +# path to the directory that holds ``gperf`` program +# +# ``GPERF_VERSION`` +# version of ``gperf`` +# +# ``GPERF_FOUND`` +# true if the program was found +# +# The minimum required version of ``gperf`` can be specified using the +# standard CMake syntax, e.g. ``find_package(GPERF 3.0)``. + +find_program(GPERF_EXECUTABLE + NAMES gperf + PATHS ${GPERF_EXECUTABLE_DIR} + DOC "path to the gperf executable") + +if(GPERF_EXECUTABLE) + # Extract the path + STRING(REGEX REPLACE "/gperf$" "" GPERF_BIN_DIR ${GPERF_EXECUTABLE}) + + # the gperf commands should be executed with the C locale, otherwise + # the message (which is parsed) may be translated + set(_gperf_SAVED_LC_ALL "$ENV{LC_ALL}") + set(ENV{LC_ALL} C) + + execute_process(COMMAND ${GPERF_EXECUTABLE} --version + OUTPUT_VARIABLE GPERF_version_output + ERROR_VARIABLE GPERF_version_error + RESULT_VARIABLE GPERF_version_result + OUTPUT_STRIP_TRAILING_WHITESPACE) + + set(ENV{LC_ALL} ${_gperf_SAVED_LC_ALL}) + + if(NOT ${GPERF_version_result} EQUAL 0) + message(SEND_ERROR "Command \"${GPERF_EXECUTABLE} --version\" failed with output:\n${GPERF_version_error}") + else() + if("${GPERF_version_output}" MATCHES "^GNU gperf ([^\n]+)") + set(GPERF_VERSION "${CMAKE_MATCH_1}") + endif() + endif() + +endif() +mark_as_advanced(GPERF_EXECUTABLE GPERF_BIN_DIR) + +include(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS(GPERF REQUIRED_VARS GPERF_EXECUTABLE + VERSION_VAR GPERF_VERSION) diff --git a/cmake/FindGflags.cmake b/cmake/FindGflags.cmake new file mode 100644 index 000000000..144e1c097 --- /dev/null +++ b/cmake/FindGflags.cmake @@ -0,0 +1,30 @@ +# - Try to find Gflags includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Gflags) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Gflags_FOUND System has Gflags, include and lib dirs found +# Gflags_INCLUDE_DIR The Gflags includes directories. +# Gflags_LIBRARY The Gflags library. + +find_path(Gflags_INCLUDE_DIR NAMES gflags) +find_library(Gflags_LIBRARY NAMES libgflags.a) + +if(Gflags_INCLUDE_DIR AND Gflags_LIBRARY) + set(Gflags_FOUND TRUE) + mark_as_advanced( + Gflags_INCLUDE_DIR + Gflags_LIBRARY + ) +endif() + +if(NOT Gflags_FOUND) + message(FATAL_ERROR "Gflags doesn't exist") +endif() + diff --git a/cmake/FindGlog.cmake b/cmake/FindGlog.cmake new file mode 100644 index 000000000..539418329 --- /dev/null +++ b/cmake/FindGlog.cmake @@ -0,0 +1,30 @@ +# - Try to find Glog includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Glog) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Glog_FOUND System has Glog, include and lib dirs found +# Glog_INCLUDE_DIR The Glog includes directories. +# Glog_LIBRARY The Glog library. + +find_path(Glog_INCLUDE_DIR NAMES glog) +find_library(Glog_LIBRARY NAMES libglog.a) + +if(Glog_INCLUDE_DIR AND Glog_LIBRARY) + set(Glog_FOUND TRUE) + mark_as_advanced( + Glog_INCLUDE_DIR + Glog_LIBRARY + ) +endif() + +if(NOT Glog_FOUND) + message(FATAL_ERROR "Glog doesn't exist") +endif() + diff --git a/cmake/FindGoogletest.cmake b/cmake/FindGoogletest.cmake new file mode 100644 index 000000000..cb22c8d31 --- /dev/null +++ b/cmake/FindGoogletest.cmake @@ -0,0 +1,30 @@ +# - Try to find Googletest includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Googletest) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Googletest_FOUND System has Googletest, include and lib dirs found +# Googletest_INCLUDE_DIR The Googletest includes directories. +# Googletest_LIBRARY The Googletest library. + +find_path(Googletest_INCLUDE_DIR NAMES gmock gtest) +find_library(Googletest_LIBRARY NAMES libgmock.a libgmock_main.a libgtest.a libgtest_main.a) + +if(Googletest_INCLUDE_DIR AND Googletest_LIBRARY) + set(Googletest_FOUND TRUE) + mark_as_advanced( + Googletest_INCLUDE_DIR + Googletest_LIBRARY + ) +endif() + +if(NOT Googletest_FOUND) + message(FATAL_ERROR "Googletest doesn't exist") +endif() + diff --git a/cmake/FindJemalloc.cmake b/cmake/FindJemalloc.cmake new file mode 100644 index 000000000..e85a842fa --- /dev/null +++ b/cmake/FindJemalloc.cmake @@ -0,0 +1,29 @@ +# - Try to find Jemalloc includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Jemalloc) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Jemalloc_FOUND System has Jemalloc, include and lib dirs found +# Jemalloc_INCLUDE_DIR The Jemalloc includes directories. +# Jemalloc_LIBRARY The Jemalloc library. + +find_path(Jemalloc_INCLUDE_DIR NAMES jemalloc) +find_library(Jemalloc_LIBRARY NAMES libjemalloc.a) + +if(Jemalloc_INCLUDE_DIR AND Jemalloc_LIBRARY) + set(Jemalloc_FOUND TRUE) + mark_as_advanced( + Jemalloc_INCLUDE_DIR + Jemalloc_LIBRARY + ) +endif() + +if(NOT Jemalloc_FOUND) + message(FATAL_ERROR "Jemalloc doesn't exist") +endif() diff --git a/cmake/FindKrb5.cmake b/cmake/FindKrb5.cmake new file mode 100644 index 000000000..0c064097a --- /dev/null +++ b/cmake/FindKrb5.cmake @@ -0,0 +1,169 @@ +# Original source: +# https://raw.githubusercontent.com/nfs-ganesha/nfs-ganesha/master/src/cmake/modules/FindKrb5.cmake +# +# - Find kerberos 5 +# Find the native Kerberos 5 headers and libraries. +# KRB5_INCLUDE_DIRS - where to find krb5.h, etc. +# KRB5_LIBRARY_DIRS - where to find krb5 libraries. +# KRB5_LIBRARIES - List of libraries when using kerberos 5. +# KRB5_CFLAGS - Required cflags for KRB5, such as -I +# KRB5_LINKFLAGS - Required link flags for KRB5 +# KRB5_FOUND - True if kerberos 5 found. +# KRB5 modules may be specified as components for this find module. +# Modules may be listed by running "krb5-config". Modules include: +# krb5 Kerberos 5 application +# gssapi GSSAPI application with Kerberos 5 bindings +# krb4 Kerberos 4 application +# kadm-client Kadmin client +# kadm-server Kadmin server +# kdb Application that accesses the kerberos database +# Typical usage: +# FIND_PACKAGE(KRB5 REQUIRED gssapi) + +# First find the config script from which to obtain other values. +IF(KRB5_PREFIX) + FIND_PROGRAM(KRB5_C_CONFIG NAMES krb5-config + PATHS ${KRB5_PREFIX}/bin + NO_SYSTEM_ENVIRONMENT_PATH + NO_DEFAULT_PATH + ) +ENDIF(KRB5_PREFIX) +FIND_PROGRAM(KRB5_C_CONFIG NAMES krb5-config) + +MESSAGE(STATUS "found krb5-config here ${KRB5_C_CONFIG}") + +# Check whether we found anything. +IF(KRB5_C_CONFIG) + SET(KRB5_FOUND 1) +ELSE(KRB5_C_CONFIG) + SET(KRB5_FOUND 0) +ENDIF(KRB5_C_CONFIG) + +# Lookup the include directories needed for the components requested. +IF(KRB5_FOUND) + # Use the newer EXECUTE_PROCESS command if it is available. + IF(COMMAND EXECUTE_PROCESS) + EXECUTE_PROCESS( + COMMAND ${KRB5_C_CONFIG} ${KRB5_FIND_COMPONENTS} --cflags + OUTPUT_VARIABLE KRB5_C_CONFIG_CFLAGS + OUTPUT_STRIP_TRAILING_WHITESPACE + RESULT_VARIABLE KRB5_C_CONFIG_RESULT + ) + ELSE(COMMAND EXECUTE_PROCESS) + EXEC_PROGRAM(${KRB5_C_CONFIG} ARGS "${KRB5_FIND_COMPONENTS} --cflags" + OUTPUT_VARIABLE KRB5_C_CONFIG_CFLAGS + RETURN_VALUE KRB5_C_CONFIG_RESULT + ) + ENDIF(COMMAND EXECUTE_PROCESS) + + # Parse the include flags. + IF("${KRB5_C_CONFIG_RESULT}" MATCHES "^0$") + SET(KRB5_CFLAGS ${KRB5_C_CONFIG_CFLAGS}) + + # Convert the compile flags to a CMake list. + STRING(REGEX REPLACE " +" ";" + KRB5_C_CONFIG_CFLAGS "${KRB5_C_CONFIG_CFLAGS}") + + # Look for -I options. + SET(KRB5_INCLUDE_DIRS) + FOREACH(flag ${KRB5_C_CONFIG_CFLAGS}) + IF("${flag}" MATCHES "^-I") + STRING(REGEX REPLACE "^-I" "" DIR "${flag}") + FILE(TO_CMAKE_PATH "${DIR}" DIR) + SET(KRB5_INCLUDE_DIRS ${KRB5_INCLUDE_DIRS} "${DIR}") + ENDIF("${flag}" MATCHES "^-I") + ENDFOREACH(flag) + ELSE("${KRB5_C_CONFIG_RESULT}" MATCHES "^0$") + MESSAGE("Error running ${KRB5_C_CONFIG}: [${KRB5_C_CONFIG_RESULT}]") + SET(KRB5_FOUND 0) + ENDIF("${KRB5_C_CONFIG_RESULT}" MATCHES "^0$") +ENDIF(KRB5_FOUND) + +IF(KRB5_PREFIX) + SET(KRB5_INCLUDE_DIRS "${KRB5_PREFIX}/include" ${KRB5_INCLUDE_DIRS}) +ENDIF(KRB5_PREFIX) + +# Lookup the libraries needed for the components requested. +IF(KRB5_FOUND) + # Use the newer EXECUTE_PROCESS command if it is available. + IF(COMMAND EXECUTE_PROCESS) + EXECUTE_PROCESS( + COMMAND ${KRB5_C_CONFIG} ${KRB5_FIND_COMPONENTS} --libs gssapi + OUTPUT_VARIABLE KRB5_C_CONFIG_LIBS + OUTPUT_STRIP_TRAILING_WHITESPACE + RESULT_VARIABLE KRB5_C_CONFIG_RESULT + ) + ELSE(COMMAND EXECUTE_PROCESS) + EXEC_PROGRAM(${KRB5_C_CONFIG} ARGS "${KRB5_FIND_COMPONENTS} --libs gssapi" + OUTPUT_VARIABLE KRB5_C_CONFIG_LIBS + RETURN_VALUE KRB5_C_CONFIG_RESULT + ) + ENDIF(COMMAND EXECUTE_PROCESS) + + # Parse the library names and directories. + IF("${KRB5_C_CONFIG_RESULT}" MATCHES "^0$") + SET(KRB5_LINKFLAGS ${KRB5_C_CONFIG_LIBS}) + + STRING(REGEX REPLACE " +" ";" + KRB5_C_CONFIG_LIBS "${KRB5_C_CONFIG_LIBS}") + + # Look for -L flags for directories and -l flags for library names. + SET(KRB5_LIBRARY_DIRS) + SET(KRB5_LIBRARY_NAMES) + FOREACH(flag ${KRB5_C_CONFIG_LIBS}) + IF("${flag}" MATCHES "^-L") + STRING(REGEX REPLACE "^-L" "" DIR "${flag}") + FILE(TO_CMAKE_PATH "${DIR}" DIR) + SET(KRB5_LIBRARY_DIRS ${KRB5_LIBRARY_DIRS} "${DIR}") + ELSEIF("${flag}" MATCHES "^-l") + STRING(REGEX REPLACE "^-l" "" NAME "${flag}") + SET(KRB5_LIBRARY_NAMES ${KRB5_LIBRARY_NAMES} "${NAME}") + ENDIF("${flag}" MATCHES "^-L") + ENDFOREACH(flag) + + # add gssapi_krb5 (MIT) + #SET(KRB5_LIBRARY_NAMES ${KRB5_LIBRARY_NAMES} "gssapi_krb5") + # Add krb5support + SET(KRB5_LIBRARY_NAMES ${KRB5_LIBRARY_NAMES} "krb5support") + STRING(CONCAT KRB5_LINKFLAGS ${KRB5_LINKFLAGS} " -lkrb5support") + + # Search for each library needed using the directories given. + FOREACH(name ${KRB5_LIBRARY_NAMES}) + # Look for this library. + FIND_LIBRARY(KRB5_${name}_LIBRARY + NAMES ${name} + PATHS ${KRB5_LIBRARY_DIRS} + NO_DEFAULT_PATH + ) + FIND_LIBRARY(KRB5_${name}_LIBRARY NAMES ${name}) + MARK_AS_ADVANCED(KRB5_${name}_LIBRARY) + + # If any library is not found then the whole package is not found. + IF(NOT KRB5_${name}_LIBRARY) + SET(KRB5_FOUND 0) + ENDIF(NOT KRB5_${name}_LIBRARY) + + # Build an ordered list of all the libraries needed. + SET(KRB5_LIBRARIES ${KRB5_LIBRARIES} "${KRB5_${name}_LIBRARY}") + ENDFOREACH(name) + ELSE("${KRB5_C_CONFIG_RESULT}" MATCHES "^0$") + MESSAGE("Error running ${KRB5_C_CONFIG}: [${KRB5_C_CONFIG_RESULT}]") + SET(KRB5_FOUND 0) + ENDIF("${KRB5_C_CONFIG_RESULT}" MATCHES "^0$") +ENDIF(KRB5_FOUND) + +# Report the results. +IF(NOT KRB5_FOUND) + SET(KRB5_DIR_MESSAGE + "KRB5 was not found. Make sure the entries KRB5_* are set.") + IF(NOT KRB5_FIND_QUIETLY) + MESSAGE(STATUS "${KRB5_DIR_MESSAGE}") + ELSE(NOT KRB5_FIND_QUIETLY) + IF(KRB5_FIND_REQUIRED) + MESSAGE(FATAL_ERROR "${KRB5_DIR_MESSAGE}") + ENDIF(KRB5_FIND_REQUIRED) + ENDIF(NOT KRB5_FIND_QUIETLY) +ELSE(NOT KRB5_FOUND) + MESSAGE(STATUS "Found kerberos 5 headers: ${KRB5_INCLUDE_DIRS}") + MESSAGE(STATUS "Found kerberos 5 libs: ${KRB5_LIBRARIES}") +ENDIF(NOT KRB5_FOUND) diff --git a/cmake/FindLibevent.cmake b/cmake/FindLibevent.cmake new file mode 100644 index 000000000..4d314af9a --- /dev/null +++ b/cmake/FindLibevent.cmake @@ -0,0 +1,30 @@ +# - Try to find Libevent includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Libevent) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Libevent_FOUND System has Libevent, include and lib dirs found +# Libevent_INCLUDE_DIR The Libevent includes directories. +# Libevent_LIBRARY The Libevent library. + +find_path(Libevent_INCLUDE_DIR NAMES event.h) +find_library(Libevent_LIBRARY NAMES libevent.a) + +if(Libevent_INCLUDE_DIR AND Libevent_LIBRARY) + set(Libevent_FOUND TRUE) + mark_as_advanced( + Libevent_INCLUDE_DIR + Libevent_LIBRARY + ) +endif() + +if(NOT Libevent_FOUND) + message(FATAL_ERROR "Libevent doesn't exist") +endif() + diff --git a/cmake/FindLibunwind.cmake b/cmake/FindLibunwind.cmake new file mode 100644 index 000000000..3be74fa8a --- /dev/null +++ b/cmake/FindLibunwind.cmake @@ -0,0 +1,24 @@ +# FindLibunwind +# ------------- +# +# Find Libunwind +# +# Find LibUnwind library +# +# :: +# +# LIBUNWIND_LIBRARY_DIR - Can be provided to advise the search +# +# Libunwind_FOUND - True if libunwind is found. +# LIBUNWIND_LIBRARIES - libunwind libraries to link against. + +find_library(LIBUNWIND_LIBRARY NAMES unwind libunwind.so.8 PATHS ${LIBUNWIND_LIBRARY_DIR}) + +include(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS(Libunwind REQUIRED_VARS LIBUNWIND_LIBRARY) + +if (Libunwind_FOUND) + set(LIBUNWIND_LIBRARIES ${LIBUNWIND_LIBRARY}) +endif () + +mark_as_advanced( LIBUNWIND_LIBRARY ) diff --git a/cmake/FindMstch.cmake b/cmake/FindMstch.cmake new file mode 100644 index 000000000..6ed969b92 --- /dev/null +++ b/cmake/FindMstch.cmake @@ -0,0 +1,31 @@ +# - Try to find Mstch includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Mstch) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Mstch_FOUND System has Mstch, include and lib dirs found +# Mstch_INCLUDE_DIR The Mstch includes directories. +# Mstch_LIBRARY The Mstch library. + +find_path(Mstch_INCLUDE_DIR NAMES mstch) +find_library(Mstch_LIBRARY NAMES libmstch.a) +message(STATUS "Mstch_INCLUDE_DIR = ${Mstch_INCLUDE_DIR} , Mstch_LIBRARY = ${Mstch_LIBRARY}") + +if(Mstch_INCLUDE_DIR AND Mstch_LIBRARY) + set(Mstch_FOUND TRUE) + mark_as_advanced( + Mstch_INCLUDE_DIR + Mstch_LIBRARY + ) +endif() + +if(NOT Mstch_FOUND) + message(FATAL_ERROR "Mstch doesn't exist") +endif() + diff --git a/cmake/FindProxygen.cmake b/cmake/FindProxygen.cmake new file mode 100644 index 000000000..9caef4893 --- /dev/null +++ b/cmake/FindProxygen.cmake @@ -0,0 +1,31 @@ +# - Try to find Proxygen includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Proxygen) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Proxygen_FOUND System has Proxygen, include and lib dirs found +# Proxygen_INCLUDE_DIR The Proxygen includes directories. +# Proxygen_LIBRARY The Proxygen library. + +find_path(Proxygen_INCLUDE_DIR NAMES proxygen) +find_library(Proxygen_LIBRARY NAMES libproxygenlib.a) + +if(Proxygen_INCLUDE_DIR AND Proxygen_LIBRARY) + set(Proxygen_FOUND TRUE) + mark_as_advanced( + Proxygen_INCLUDE_DIR + Proxygen_LIBRARY + ) +endif(Proxygen_INCLUDE_DIR AND Proxygen_LIBRARY) + +if(NOT Proxygen_FOUND) + message(FATAL_ERROR "Proxygen doesn't exist") +endif() + + diff --git a/cmake/FindSnappy.cmake b/cmake/FindSnappy.cmake new file mode 100644 index 000000000..bb7b5d199 --- /dev/null +++ b/cmake/FindSnappy.cmake @@ -0,0 +1,30 @@ +# - Try to find Snappy includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Snappy) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Snappy_FOUND System has Snappy, include and lib dirs found +# Snappy_INCLUDE_DIR The Snappy includes directories. +# Snappy_LIBRARY The Snappy library. + +find_path(Snappy_INCLUDE_DIR NAMES snappy.h) +find_library(Snappy_LIBRARY NAMES libsnappy.a) + +if(Snappy_INCLUDE_DIR AND Snappy_LIBRARY) + set(Snappy_FOUND TRUE) + mark_as_advanced( + Snappy_INCLUDE_DIR + Snappy_LIBRARY + ) +endif() + +if(NOT Snappy_FOUND) + message(FATAL_ERROR "Snappy doesn't exist") +endif() + diff --git a/cmake/FindWangle.cmake b/cmake/FindWangle.cmake new file mode 100644 index 000000000..d3e55e04a --- /dev/null +++ b/cmake/FindWangle.cmake @@ -0,0 +1,30 @@ +# - Try to find Wangle includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Wangle) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Wangle_FOUND System has Wangle, include and lib dirs found +# Wangle_INCLUDE_DIR The Wangle includes directories. +# Wangle_LIBRARY The Wangle library. + +find_path(Wangle_INCLUDE_DIR NAMES wangle) +find_library(Wangle_LIBRARY NAMES libwangle.a) + +if(Wangle_INCLUDE_DIR AND Wangle_LIBRARY) + set(Wangle_FOUND TRUE) + mark_as_advanced( + Wangle_INCLUDE_DIR + Wangle_LIBRARY + ) +endif() + +if(NOT Wangle_FOUND) + message(FATAL_ERROR "Wangle doesn't exist") +endif() + diff --git a/cmake/FindZstd.cmake b/cmake/FindZstd.cmake new file mode 100644 index 000000000..1c49a8717 --- /dev/null +++ b/cmake/FindZstd.cmake @@ -0,0 +1,34 @@ +# - Try to find Zstd includes dirs and libraries +# +# Usage of this module as follows: +# +# find_package(Zstd) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# Variables defined by this module: +# +# Zstd_FOUND System has Zstd, include and lib dirs found +# Zstd_INCLUDE_DIR The Zstd includes directories. +# Zstd_LIBRARY The Zstd library. +# Zstd_BIN The Zstd binary. + +find_path(Zstd_INCLUDE_DIR NAMES zstd.h) +find_library(Zstd_LIBRARY NAMES libzstd.a) +find_program(Zstd_BIN NAMES zstd) + +if(Zstd_INCLUDE_DIR AND Zstd_LIBRARY AND Zstd_BIN) + set(Zstd_FOUND TRUE) + mark_as_advanced( + Zstd_INCLUDE_DIR + Zstd_LIBRARY + Zstd_BIN + ) +endif() + +if(NOT Zstd_FOUND) + message(FATAL_ERROR "Zstd doesn't exist") +endif() + + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 44f0a3900..40883128a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,7 @@ -nebula_add_subdirectory(daemons) -nebula_add_subdirectory(dataman) -nebula_add_subdirectory(kvstore) -nebula_add_subdirectory(meta) -nebula_add_subdirectory(jni) -nebula_add_subdirectory(storage) -nebula_add_subdirectory(tools) +#nebula_add_subdirectory(daemons) +nebula_add_subdirectory(codec) +#nebula_add_subdirectory(kvstore) +#nebula_add_subdirectory(meta) +#nebula_add_subdirectory(jni) +#nebula_add_subdirectory(storage) +#nebula_add_subdirectory(tools) diff --git a/src/codec/CMakeLists.txt b/src/codec/CMakeLists.txt new file mode 100644 index 000000000..c35fa8a81 --- /dev/null +++ b/src/codec/CMakeLists.txt @@ -0,0 +1,7 @@ +nebula_add_library( + codec_obj OBJECT + RowReader.cpp + RowReaderV1.cpp +) + +nebula_add_subdirectory(test) diff --git a/src/codec/Common.h b/src/codec/Common.h new file mode 100644 index 000000000..a655ad229 --- /dev/null +++ b/src/codec/Common.h @@ -0,0 +1,88 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef CODEC_COMMON_H_ +#define CODEC_COMMON_H_ + +#include "base/Base.h" + +namespace nebula { + +template +typename std::enable_if< + std::is_integral< + typename std::remove_cv< + typename std::remove_reference::type + >::type + >::value, + bool +>::type +intToBool(IntType iVal) { + return iVal != 0; +} + + +inline bool strToBool(folly::StringPiece str) { + return str == "Y" || str == "y" || str == "T" || str == "t" || + str == "yes" || str == "Yes" || str == "YES" || + str == "true" || str == "True" || str == "TRUE"; +} + + +inline std::string toHexStr(folly::StringPiece str) { + static const char* hex[] = { + "00", "01", "02", "03", "04", "05", "06", "07", + "08", "09", "0A", "0B", "0C", "0D", "0E", "0F", + "10", "11", "12", "13", "14", "15", "16", "17", + "18", "19", "1A", "1B", "1C", "1D", "1E", "1F", + "20", "21", "22", "23", "24", "25", "26", "27", + "28", "29", "2A", "2B", "2C", "2D", "2E", "2F", + "30", "31", "32", "33", "34", "35", "36", "37", + "38", "39", "3A", "3B", "3C", "3D", "3E", "3F", + "40", "41", "42", "43", "44", "45", "46", "47", + "48", "49", "4A", "4B", "4C", "4D", "4E", "4F", + "50", "51", "52", "53", "54", "55", "56", "57", + "58", "59", "5A", "5B", "5C", "5D", "5E", "5F", + "60", "61", "62", "63", "64", "65", "66", "67", + "68", "69", "6A", "6B", "6C", "6D", "6E", "6F", + "70", "71", "72", "73", "74", "75", "76", "77", + "78", "79", "7A", "7B", "7C", "7D", "7E", "7F", + "80", "81", "82", "83", "84", "85", "86", "87", + "88", "89", "8A", "8B", "8C", "8D", "8E", "8F", + "90", "91", "92", "93", "94", "95", "96", "97", + "98", "99", "9A", "9B", "9C", "9D", "9E", "9F", + "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", + "A8", "A9", "AA", "AB", "AC", "AD", "AE", "AF", + "B0", "B1", "B2", "B3", "B4", "B5", "B6", "B7", + "B8", "B9", "BA", "BB", "BC", "BD", "BE", "BF", + "C0", "C1", "C2", "C3", "C4", "C5", "C6", "C7", + "C8", "C9", "CA", "CB", "CC", "CD", "CE", "CF", + "D0", "D1", "D2", "D3", "D4", "D5", "D6", "D7", + "D8", "D9", "DA", "DB", "DC", "DD", "DE", "DF", + "E0", "E1", "E2", "E3", "E4", "E5", "E6", "E7", + "E8", "E9", "EA", "EB", "EC", "ED", "EE", "EF", + "F0", "F1", "F2", "F3", "F4", "F5", "F6", "F7", + "F8", "F9", "FA", "FB", "FC", "FD", "FE", "FF"}; + + if (str.empty()) { + return std::string(); + } + + std::string buf; + buf.reserve(str.size() * 3 - 1); + + buf.append(hex[static_cast(str[0])]); + for (size_t i = 1; i < str.size(); i++) { + buf.append(" "); + buf.append(hex[static_cast(str[i])]); + } + + return buf; +} + +} // namespace nebula +#endif // CODEC_COMMON_H_ + diff --git a/src/dataman/NebulaCodecImpl.cpp b/src/codec/NebulaCodecImpl.cpp similarity index 100% rename from src/dataman/NebulaCodecImpl.cpp rename to src/codec/NebulaCodecImpl.cpp diff --git a/src/dataman/NebulaCodecImpl.h b/src/codec/NebulaCodecImpl.h similarity index 100% rename from src/dataman/NebulaCodecImpl.h rename to src/codec/NebulaCodecImpl.h diff --git a/src/dataman/README.md b/src/codec/README.md similarity index 100% rename from src/dataman/README.md rename to src/codec/README.md diff --git a/src/codec/RowReader.cpp b/src/codec/RowReader.cpp new file mode 100644 index 000000000..2d8d32fde --- /dev/null +++ b/src/codec/RowReader.cpp @@ -0,0 +1,157 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include "codec/RowReader.h" +#include "codec/RowReaderV1.h" + +namespace nebula { + +/********************************************* + * + * class RowReader::Cell + * + ********************************************/ +Value RowReader::Cell::value() const noexcept { + return iter_->reader_->getValueByIndex(iter_->index_); +} + + +/********************************************* + * + * class RowReader::Iterator + * + ********************************************/ +RowReader::Iterator::Iterator(Iterator&& iter) + : reader_(iter.reader_) + , cell_(std::move(iter.cell_)) + , index_(iter.index_) { +} + + +void RowReader::Iterator::operator=(Iterator&& rhs) { + reader_ = rhs.reader_; + cell_ = std::move(rhs.cell_); + index_ = rhs.index_; +} + + +bool RowReader::Iterator::operator==(const Iterator& rhs) const noexcept { + return reader_ == rhs.reader_ && index_ == rhs.index_; +} + + +const RowReader::Cell& RowReader::Iterator::operator*() const noexcept { + return cell_; +} + + +const RowReader::Cell* RowReader::Iterator::operator->() const noexcept { + return &cell_; +} + + +RowReader::Iterator& RowReader::Iterator::operator++() { + if (index_ < reader_->numFields()) { + ++index_; + } + return *this; +} + + +/********************************************* + * + * class RowReader + * + ********************************************/ +// static +/* +std::unique_ptr RowReader::getTagPropReader( + meta::SchemaManager* schemaMan, + GraphSpaceID space, + TagID tag, + folly::StringPiece row) { + CHECK_NOTNULL(schemaMan); + int32_t ver = getSchemaVer(row); + if (ver >= 0) { + return std::unique_ptr( + new RowReaderV1(row, schemaMan->getTagSchema(space, tag, ver))); + } + + // Invalid data + // TODO We need a better error handler here + LOG(FATAL) << "Invalid schema version in the row data!"; +} + + +// static +std::unique_ptr RowReader::getEdgePropReader( + meta::SchemaManager* schemaMan, + GraphSpaceID space, + EdgeType edge, + folly::StringPiece row) { + CHECK_NOTNULL(schemaMan); + int32_t ver = getSchemaVer(row); + if (ver >= 0) { + return std::unique_ptr( + new RowReaderV1(row, schemaMan->getEdgeSchema(space, edge, ver))); + } + + // Invalid data + // TODO We need a better error handler here + LOG(FATAL) << "Invalid schema version in the row data!"; +} +*/ + +// static +std::unique_ptr RowReader::getRowReader( + std::shared_ptr schema, + folly::StringPiece row) { + SchemaVer ver = getSchemaVer(row); + CHECK_EQ(ver, schema->getVersion()); + return std::unique_ptr(new RowReaderV1(row, std::move(schema))); +} + + +// static +SchemaVer RowReader::getSchemaVer(folly::StringPiece row) { + const uint8_t* it = reinterpret_cast(row.begin()); + if (reinterpret_cast(it) == row.end()) { + LOG(ERROR) << "Row data is empty, so there is no schema version"; + return 0; + } + + // The first three bits indicate the number of bytes for the + // schema version. If the number is zero, no schema version + // presents + size_t verBytes = *(it++) >> 5; + int32_t ver = 0; + if (verBytes > 0) { + if (verBytes + 1 > row.size()) { + // Data is too short + LOG(ERROR) << "Row data is too short: " << toHexStr(row); + return 0; + } + // Schema Version is stored in Little Endian + for (size_t i = 0; i < verBytes; i++) { + ver |= (uint32_t(*(it++)) << (8 * i)); + } + } + + return ver; +} + + +const Value& RowReader::getDefaultValue(const std::string& prop) { + auto field = schema_->field(prop); + if (!!field && field->hasDefault()) { + return field->defaultValue(); + } + + return Value::null(); +} + +} // namespace nebula diff --git a/src/codec/RowReader.h b/src/codec/RowReader.h new file mode 100644 index 000000000..c763ad9b4 --- /dev/null +++ b/src/codec/RowReader.h @@ -0,0 +1,125 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef CODEC_ROWREADER_H_ +#define CODEC_ROWREADER_H_ + +#include "base/Base.h" +#include "datatypes/Value.h" +#include "codec/Common.h" +#include "meta/SchemaProviderIf.h" + +namespace nebula { + +/** + * This class decodes one row of data + */ +class RowReader { +public: + class Iterator; + + class Cell final { + friend class Iterator; + public: + Value value() const noexcept; + + private: + const Iterator* iter_; + + explicit Cell(const Iterator* iter) : iter_(iter) {} + }; + + + class Iterator final { + friend class Cell; + friend class RowReader; + public: + Iterator(Iterator&& iter); + + void operator=(Iterator&& rhs); + + const Cell& operator*() const noexcept; + const Cell* operator->() const noexcept; + + Iterator& operator++(); + + bool operator==(const Iterator& rhs) const noexcept; + bool operator!=(const Iterator& rhs) const noexcept { + return ! operator==(rhs); + } + + private: + const RowReader* reader_; + Cell cell_; + size_t index_; + + Iterator(const RowReader* reader, size_t index = 0) + : reader_(reader), cell_(this), index_(index) {} + }; + + +public: +/* + static std::unique_ptr getTagPropReader( + meta::SchemaManager* schemaMan, + GraphSpaceID space, + TagID tag, + folly::StringPiece row); + static std::unique_ptr getEdgePropReader( + meta::SchemaManager* schemaMan, + GraphSpaceID space, + EdgeType edge, + folly::StringPiece row); +*/ + static std::unique_ptr getRowReader( + std::shared_ptr schema, + folly::StringPiece row); + + virtual ~RowReader() = default; + + virtual Value getValueByName(const std::string& prop) const noexcept = 0; + virtual Value getValueByIndex(const int64_t index) const noexcept = 0; + + const Value& getDefaultValue(const std::string& prop); + + Iterator begin() const noexcept { + return Iterator(this, 0); + } + + const Iterator& end() const noexcept { + return endIter_; + } + + SchemaVer schemaVer() const noexcept { + return schema_->getVersion(); + } + + size_t numFields() const noexcept { + return schema_->getNumFields(); + } + + std::shared_ptr getSchema() const { + return schema_; + } + +protected: + std::shared_ptr schema_; + folly::StringPiece data_; + + explicit RowReader(std::shared_ptr schema, + folly::StringPiece row) + : schema_(schema) + , data_(row) + , endIter_(this, schema_->getNumFields()) {} + + static SchemaVer getSchemaVer(folly::StringPiece row); + +private: + const Iterator endIter_; +}; + +} // namespace nebula +#endif // CODEC_ROWREADER_H_ diff --git a/src/codec/RowReaderV1.cpp b/src/codec/RowReaderV1.cpp new file mode 100644 index 000000000..d97cad3f6 --- /dev/null +++ b/src/codec/RowReaderV1.cpp @@ -0,0 +1,507 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "codec/RowReaderV1.h" +#include "interface/gen-cpp2/meta_types.h" + + +#define RR_GET_OFFSET() \ + if (index >= static_cast(schema_->getNumFields())) { \ + return NullType::BAD_DATA; \ + } \ + int64_t offset = skipToField(index); \ + if (offset < 0) { \ + return NullType::BAD_DATA; \ + } + +namespace nebula { + +/********************************************* + * + * class RowReaderV1 + * + ********************************************/ +RowReaderV1::RowReaderV1(folly::StringPiece row, + std::shared_ptr schema) + : RowReader(schema, row) { + CHECK(!!schema_) << "A schema must be provided"; + + if (processHeader(row)) { + // data_.begin() points to the first field + data_.reset(row.begin() + headerLen_, row.size() - headerLen_); + } else { + // Invalid data + LOG(FATAL) << "Invalid row data: " << toHexStr(row); + } +} + + +bool RowReaderV1::processHeader(folly::StringPiece row) { + const uint8_t* it = reinterpret_cast(row.begin()); + if (reinterpret_cast(it) == row.end()) { + return false; + } + + DCHECK(!!schema_) << "A schema must be provided"; + + // The last three bits indicate the number of bytes for offsets + // The first three bits indicate the number of bytes for the + // schema version. If the number is zero, no schema version + // presents + numBytesForOffset_ = (*it & 0x07) + 1; + int32_t verBytes = *(it++) >> 5; + it += verBytes; + + // Process the block offsets + // Block offsets point to the start of every 16 fields, except the + // first 16 fields + // Block offsets are stored in Little Endian + uint32_t numFields = schema_->getNumFields(); + uint32_t numOffsets = (numFields >> 4); + if (numBytesForOffset_ * numOffsets + verBytes + 1 > row.size()) { + // Data is too short + LOG(ERROR) << "Row data is too short: " << toHexStr(row); + return false; + } + offsets_.resize(numFields + 1, -1); + offsets_[0] = 0; + blockOffsets_.emplace_back(0, 0); + blockOffsets_.reserve(numOffsets); + for (uint32_t i = 0; i < numOffsets; i++) { + int64_t offset = 0; + for (int32_t j = 0; j < numBytesForOffset_; j++) { + offset |= (uint64_t(*(it++)) << (8 * j)); + } + blockOffsets_.emplace_back(offset, 0); + offsets_[16 * (i + 1)] = offset; + } + // Now done with the header + + headerLen_ = reinterpret_cast(it) - row.begin(); + offsets_[numFields] = row.size() - headerLen_; + + return true; +} + + +int64_t RowReaderV1::skipToNext(int64_t index, int64_t offset) const noexcept { + const meta::cpp2::PropertyType& vType = getSchema()->getFieldType(index); + if (offsets_[index + 1] >= 0) { + return offsets_[index + 1]; + } + + switch (vType) { + case meta::cpp2::PropertyType::BOOL: { + // One byte + offset++; + break; + } + case meta::cpp2::PropertyType::INT64: + case meta::cpp2::PropertyType::TIMESTAMP: { + int64_t v; + int32_t len = readInteger(offset, v); + if (len <= 0) { + return -1; + } + offset += len; + break; + } + case meta::cpp2::PropertyType::FLOAT: { + // Four bytes + offset += sizeof(float); + break; + } + case meta::cpp2::PropertyType::DOUBLE: { + // Eight bytes + offset += sizeof(double); + break; + } + case meta::cpp2::PropertyType::STRING: { + int64_t strLen; + int32_t intLen = readInteger(offset, strLen); + if (intLen <= 0) { + return -1; + } + offset += intLen + strLen; + break; + } + case meta::cpp2::PropertyType::VID: { + // Eight bytes + offset += sizeof(int64_t); + break; + } + default: { + // TODO + LOG(FATAL) << "Unimplemented"; + } + } + + if (offset > static_cast(data_.size())) { + return -1; + } + + // Update offsets + offsets_[index + 1] = offset; + // Update block offsets + int32_t base = (index + 1) >> 4; + blockOffsets_[base].second = ((index + 1) & 0x0F); + + return offset; +} + + +int64_t RowReaderV1::skipToField(int64_t index) const noexcept { + DCHECK_GE(index, 0); + if (index >= static_cast(schema_->getNumFields())) { + // Index is out of range + return -1; + } + + int64_t base = index >> 4; + const auto& blockOffset = blockOffsets_[base]; + base <<= 4; + int64_t maxVisitedIndex = base + blockOffset.second; + if (index <= maxVisitedIndex) { + return offsets_[index]; + } + + int64_t offset = offsets_[maxVisitedIndex]; + for (int64_t i = maxVisitedIndex; i < base + (index & 0x0000000f); i++) { + offset = skipToNext(i, offset); + if (offset < 0) { + return -1; + } + } + + return offset; +} + + +/************************************************************ + * + * Get the property value + * + ***********************************************************/ +Value RowReaderV1::getValueByName(const std::string& prop) const noexcept { + int64_t index = getSchema()->getFieldIndex(prop); + return getValueByIndex(index); +} + + +Value RowReaderV1::getValueByIndex(const int64_t index) const noexcept { + auto vType = getSchema()->getFieldType(index); + switch (vType) { + case meta::cpp2::PropertyType::BOOL: + return getBool(index); + case meta::cpp2::PropertyType::INT64: + case meta::cpp2::PropertyType::TIMESTAMP: + return getInt(index); + case meta::cpp2::PropertyType::VID: + return getVid(index); + case meta::cpp2::PropertyType::FLOAT: + return getFloat(index); + case meta::cpp2::PropertyType::DOUBLE: + return getDouble(index); + case meta::cpp2::PropertyType::STRING: + return getString(index); + default: + LOG(ERROR) << "Unknown type: " << static_cast(vType); + return NullType::BAD_TYPE; + } +} + + +/************************************************************ + * + * Get the property value from the serialized binary string + * + ***********************************************************/ +Value RowReaderV1::getBool(int64_t index) const noexcept { + RR_GET_OFFSET() + Value v; + switch (getSchema()->getFieldType(index)) { + case meta::cpp2::PropertyType::BOOL: { + v.setBool(intToBool(data_[offset])); + offset++; + break; + } + case meta::cpp2::PropertyType::INT64: { + int64_t intV; + int32_t numBytes = readInteger(offset, intV); + if (numBytes > 0) { + v.setBool(intToBool(intV)); + offset += numBytes; + } else { + v.setNull(NullType::BAD_DATA); + } + break; + } + case meta::cpp2::PropertyType::STRING: { + folly::StringPiece strV; + int32_t numBytes = readString(offset, strV); + if (numBytes > 0) { + v.setBool(strToBool(strV)); + offset += numBytes; + } else { + v.setNull(NullType::BAD_DATA); + } + break; + } + default: { + v.setNull(NullType::BAD_TYPE); + } + } + + return v; +} + + +Value RowReaderV1::getInt(int64_t index) const noexcept { + RR_GET_OFFSET() + Value v; + switch (getSchema()->getFieldType(index)) { + case meta::cpp2::PropertyType::INT64: + case meta::cpp2::PropertyType::TIMESTAMP: { + int64_t val; + int32_t numBytes = readInteger(offset, val); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + v.setInt(val); + offset += numBytes; + } + break; + } + default: { + v.setNull(NullType::BAD_TYPE); + } + } + + return v; +} + + +Value RowReaderV1::getFloat(int64_t index) const noexcept { + RR_GET_OFFSET() + Value v; + switch (getSchema()->getFieldType(index)) { + case meta::cpp2::PropertyType::FLOAT: { + float f; + int32_t numBytes = readFloat(offset, f); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + v.setFloat(f); + offset += numBytes; + } + break; + } + case meta::cpp2::PropertyType::DOUBLE: { + double d; + int32_t numBytes = readDouble(offset, d); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + if (d < std::numeric_limits::min() || + d > std::numeric_limits::max()) { + v.setNull(NullType::OVERFLOW); + } else { + v.setFloat(d); + } + offset += numBytes; + } + break; + } + default: { + v.setNull(NullType::BAD_TYPE); + } + } + + return v; +} + + +Value RowReaderV1::getDouble(int64_t index) const noexcept { + RR_GET_OFFSET() + Value v; + switch (getSchema()->getFieldType(index)) { + case meta::cpp2::PropertyType::FLOAT: { + float f; + int32_t numBytes = readFloat(offset, f); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + v.setFloat(f); + offset += numBytes; + } + break; + } + case meta::cpp2::PropertyType::DOUBLE: { + double d; + int32_t numBytes = readDouble(offset, d); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + v.setFloat(d); + offset += numBytes; + } + break; + } + default: { + v.setNull(NullType::BAD_TYPE); + } + } + + return v; +} + + +Value RowReaderV1::getString(int64_t index) const noexcept { + RR_GET_OFFSET() + Value v; + switch (getSchema()->getFieldType(index)) { + case meta::cpp2::PropertyType::STRING: { + folly::StringPiece s; + int32_t numBytes = readString(offset, s); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + v.setStr(s.toString()); + offset += numBytes; + } + break; + } + default: { + v.setNull(NullType::BAD_TYPE); + } + } + + return v; +} + + +Value RowReaderV1::getInt64(int64_t index) const noexcept { + RR_GET_OFFSET() + Value v; + int64_t val; + switch (getSchema()->getFieldType(index)) { + case meta::cpp2::PropertyType::INT64: + case meta::cpp2::PropertyType::TIMESTAMP: { + int32_t numBytes = readInteger(offset, val); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + v.setInt(val); + offset += numBytes; + } + break; + } + case meta::cpp2::PropertyType::VID: { + int32_t numBytes = readVid(offset, val); + if (numBytes < 0) { + v.setNull(NullType::BAD_DATA); + } else { + v.setInt(val); + offset += numBytes; + } + break; + } + default: { + v.setNull(NullType::BAD_TYPE); + } + } + + return v; +} + + +Value RowReaderV1::getVid(int64_t index) const noexcept { + auto fieldType = getSchema()->getFieldType(index); + if (fieldType == meta::cpp2::PropertyType::INT64 || + fieldType == meta::cpp2::PropertyType::VID) { + // Since 2.0, vid has been defined as a binary array. So we need to convert + // the int64 vid in 1.0 to a binary array + Value v(getInt64(index)); + CHECK_EQ(v.type(), Value::Type::INT); + int64_t vid = v.getInt(); + v.setStr(std::string(reinterpret_cast(&vid), sizeof(int64_t))); + return v; + } else { + return NullType::BAD_TYPE; + } +} + + +/************************************************************ + * + * Low -level functions to read from the bytes + * + ***********************************************************/ +int32_t RowReaderV1::readInteger(int64_t offset, int64_t& v) const noexcept { + const uint8_t* start = reinterpret_cast(&(data_[offset])); + folly::ByteRange range(start, data_.size() - offset); + + try { + v = folly::decodeVarint(range); + } catch (const std::exception& ex) { + return -1; + } + return range.begin() - start; +} + + +int32_t RowReaderV1::readFloat(int64_t offset, float& v) const noexcept { + if (offset + sizeof(float) > data_.size()) { + return -1; + } + + memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(float)); + + return sizeof(float); +} + + +int32_t RowReaderV1::readDouble(int64_t offset, double& v) const noexcept { + if (offset + sizeof(double) > data_.size()) { + return -1; + } + + memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(double)); + + return sizeof(double); +} + + +int32_t RowReaderV1::readString(int64_t offset, folly::StringPiece& v) const noexcept { + int64_t strLen; + int32_t intLen = readInteger(offset, strLen); + CHECK_GT(intLen, 0) << "Invalid string length"; + if (offset + intLen + strLen > static_cast(data_.size())) { + return -1; + } + + v = data_.subpiece(offset + intLen, strLen); + return intLen + strLen; +} + + +int32_t RowReaderV1::readInt64(int64_t offset, int64_t& v) const noexcept { + if (offset + sizeof(int64_t) > data_.size()) { + return -1; + } + + // VID is stored in Little Endian + memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(int64_t)); + + return sizeof(int64_t); +} + + +int32_t RowReaderV1::readVid(int64_t offset, int64_t& v) const noexcept { + return readInt64(offset, v); +} + +} // namespace nebula diff --git a/src/codec/RowReaderV1.h b/src/codec/RowReaderV1.h new file mode 100644 index 000000000..f2ab300bb --- /dev/null +++ b/src/codec/RowReaderV1.h @@ -0,0 +1,87 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef CODEC_ROWREADERV1_H_ +#define CODEC_ROWREADERV1_H_ + +#include "base/Base.h" +#include +#include "codec/RowReader.h" + +namespace nebula { + +/** + * This class decodes the data from version 1.0 + */ +class RowReaderV1 : public RowReader { + friend class RowReader; + + FRIEND_TEST(RowReaderV1, headerInfo); + FRIEND_TEST(RowReaderV1, encodedData); + FRIEND_TEST(RowReaderV1, iterator); + +public: + Value getValueByName(const std::string& prop) const noexcept override; + Value getValueByIndex(const int64_t index) const noexcept override; + +protected: + RowReaderV1(folly::StringPiece row, + std::shared_ptr schema); + +private: + int32_t headerLen_ = 0; + int32_t numBytesForOffset_ = 0; + // Block offet value is composed by two integers. The first one is + // the block offset, the second one is the largest index being visited + // in the block. This index is zero-based + mutable std::vector> blockOffsets_; + mutable std::vector offsets_; + +private: + // Process the row header infomation + // Returns false when the row data is invalid + bool processHeader(folly::StringPiece row); + + // Process the block offsets (each block contains certain number of fields) + // Returns false when the row data is invalid + bool processBlockOffsets(folly::StringPiece row, int32_t verBytes); + + // Skip to the next field + // Parameter: + // index : the current field index + // offset : the current offset + // When succeeded, the method returns the offset pointing to the + // next field + // When failed, the method returns a negative number + int64_t skipToNext(int64_t index, int64_t offset) const noexcept; + + // Skip to the {index}Th field + // The method retuns the offset of the field + // It returns a negative number when the data corrupts + int64_t skipToField(int64_t index) const noexcept; + + // Following methods assume the parameters index are valid + // When succeeded, offset will advance + Value getBool(int64_t index) const noexcept; + Value getInt(int64_t index) const noexcept; + Value getFloat(int64_t index) const noexcept; + Value getDouble(int64_t index) const noexcept; + Value getString(int64_t index) const noexcept; + Value getInt64(int64_t index) const noexcept; + Value getVid(int64_t index) const noexcept; + + // The following methods all return the number of bytes read + // A negative number will be returned if an error occurs + int32_t readInteger(int64_t offset, int64_t& v) const noexcept; + int32_t readFloat(int64_t offset, float& v) const noexcept; + int32_t readDouble(int64_t offset, double& v) const noexcept; + int32_t readString(int64_t offset, folly::StringPiece& v) const noexcept; + int32_t readInt64(int64_t offset, int64_t& v) const noexcept; + int32_t readVid(int64_t offset, int64_t& v) const noexcept; +}; + +} // namespace nebula +#endif // CODEC_ROWREADERV1_H_ diff --git a/src/dataman/include/NebulaCodec.h b/src/codec/include/NebulaCodec.h similarity index 100% rename from src/dataman/include/NebulaCodec.h rename to src/codec/include/NebulaCodec.h diff --git a/src/codec/test/CMakeLists.txt b/src/codec/test/CMakeLists.txt new file mode 100644 index 000000000..4c7abf3c4 --- /dev/null +++ b/src/codec/test/CMakeLists.txt @@ -0,0 +1,40 @@ +nebula_add_library( + codec_test_obj OBJECT + ResultSchemaProvider.cpp + SchemaWriter.cpp +) + +set(CODEC_TEST_LIBS + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) + + +nebula_add_test( + NAME row_reader_v1_test + SOURCES RowReaderV1Test.cpp + OBJECTS ${CODEC_TEST_LIBS} + LIBRARIES ${THRIFT_LIBRARIES} wangle gtest +) + +#nebula_add_executable( +# NAME row_reader_bm +# SOURCES RowReaderBenchmark.cpp +# OBJECTS ${CODEC_TEST_LIBS} +# LIBRARIES ${THRIFT_LIBRARIES} follybenchmark wangle boost_regex +#) +# +#nebula_add_test( +# NAME nebula_codec_test +# SOURCES NebulaCodecTest.cpp +# OBJECTS ${CODEC_TEST_LIBS} +# LIBRARIES ${THRIFT_LIBRARIES} wangle gtest +#) diff --git a/src/dataman/test/NebulaCodecTest.cpp b/src/codec/test/NebulaCodecTest.cpp similarity index 100% rename from src/dataman/test/NebulaCodecTest.cpp rename to src/codec/test/NebulaCodecTest.cpp diff --git a/src/dataman/ResultSchemaProvider.cpp b/src/codec/test/ResultSchemaProvider.cpp similarity index 72% rename from src/dataman/ResultSchemaProvider.cpp rename to src/codec/test/ResultSchemaProvider.cpp index 53b15b381..c1400f26d 100644 --- a/src/dataman/ResultSchemaProvider.cpp +++ b/src/codec/test/ResultSchemaProvider.cpp @@ -5,14 +5,14 @@ */ #include "base/Base.h" -#include "dataman/ResultSchemaProvider.h" +#include "codec/test/ResultSchemaProvider.h" namespace nebula { using folly::hash::SpookyHashV2; -using cpp2::ColumnDef; -using cpp2::ValueType; -using cpp2::Schema; +using meta::cpp2::ColumnDef; +using meta::cpp2::PropertyType; +using meta::cpp2::Schema; /*********************************** * @@ -23,13 +23,13 @@ ResultSchemaProvider::ResultSchemaField::ResultSchemaField(const ColumnDef* col) : column_(col) {} -const char* ResultSchemaProvider::ResultSchemaField::getName() const { +const char* ResultSchemaProvider::ResultSchemaField::name() const { DCHECK(!!column_); return column_->get_name().c_str(); } -const ValueType& ResultSchemaProvider::ResultSchemaField::getType() const { +const PropertyType ResultSchemaProvider::ResultSchemaField::type() const { DCHECK(!!column_); return column_->get_type(); } @@ -45,7 +45,7 @@ bool ResultSchemaProvider::ResultSchemaField::hasDefault() const { } -std::string ResultSchemaProvider::ResultSchemaField::getDefaultValue() const { +const Value& ResultSchemaProvider::ResultSchemaField::defaultValue() const { LOG(FATAL) << "Not Supported"; } @@ -59,7 +59,8 @@ ResultSchemaProvider::ResultSchemaProvider(Schema schema) : columns_(std::move(schema.get_columns())) { for (int64_t i = 0; i < static_cast(columns_.size()); i++) { const std::string& name = columns_[i].get_name(); - nameIndex_.emplace(std::make_pair(SpookyHashV2::Hash64(name.data(), name.size(), 0), i)); + nameIndex_.emplace( + std::make_pair(SpookyHashV2::Hash64(name.data(), name.size(), 0), i)); } } @@ -87,26 +88,26 @@ const char* ResultSchemaProvider::getFieldName(int64_t index) const { } -const ValueType& ResultSchemaProvider::getFieldType(int64_t index) const { +const PropertyType ResultSchemaProvider::getFieldType(int64_t index) const { if (index < 0 || index >= static_cast(columns_.size())) { - return CommonConstants::kInvalidValueType(); + return PropertyType::UNKNOWN; } return columns_[index].get_type(); } -const ValueType& ResultSchemaProvider::getFieldType(const folly::StringPiece name) const { +const PropertyType ResultSchemaProvider::getFieldType(const folly::StringPiece name) const { auto index = getFieldIndex(name); if (index < 0) { - return CommonConstants::kInvalidValueType(); + return PropertyType::UNKNOWN; } return columns_[index].get_type(); } -std::shared_ptr ResultSchemaProvider::field( - int64_t index) const { +std::shared_ptr +ResultSchemaProvider::field(int64_t index) const { if (index < 0 || index >= static_cast(columns_.size())) { return std::shared_ptr(); } @@ -114,8 +115,8 @@ std::shared_ptr ResultSchemaProvider::field } -std::shared_ptr ResultSchemaProvider::field( - const folly::StringPiece name) const { +std::shared_ptr +ResultSchemaProvider::field(const folly::StringPiece name) const { auto index = getFieldIndex(name); if (index < 0) { return std::shared_ptr(); diff --git a/src/dataman/ResultSchemaProvider.h b/src/codec/test/ResultSchemaProvider.h similarity index 53% rename from src/dataman/ResultSchemaProvider.h rename to src/codec/test/ResultSchemaProvider.h index 8b091d899..e1857dcb0 100644 --- a/src/dataman/ResultSchemaProvider.h +++ b/src/codec/test/ResultSchemaProvider.h @@ -4,8 +4,8 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#ifndef DATAMAN_RESULTSCHEMAPROVIDER_H_ -#define DATAMAN_RESULTSCHEMAPROVIDER_H_ +#ifndef CODEC_TEST_RESULTSCHEMAPROVIDER_H_ +#define CODEC_TEST_RESULTSCHEMAPROVIDER_H_ #include "base/Base.h" #include "meta/SchemaProviderIf.h" @@ -13,27 +13,26 @@ namespace nebula { class ResultSchemaProvider : public meta::SchemaProviderIf { - using ColumnDefs = std::vector; + using ColumnDefs = std::vector; public: class ResultSchemaField : public meta::SchemaProviderIf::Field { public: - explicit ResultSchemaField( - const cpp2::ColumnDef* col = nullptr); + explicit ResultSchemaField(const meta::cpp2::ColumnDef* col = nullptr); - const char* getName() const override; - const cpp2::ValueType& getType() const override; + const char* name() const override; + const meta::cpp2::PropertyType type() const override; bool isValid() const override; bool hasDefault() const override; - std::string getDefaultValue() const override; + const Value& defaultValue() const override; private: - const cpp2::ColumnDef* column_; + const meta::cpp2::ColumnDef* column_; }; public: - explicit ResultSchemaProvider(cpp2::Schema); + explicit ResultSchemaProvider(meta::cpp2::Schema); virtual ~ResultSchemaProvider() = default; SchemaVer getVersion() const noexcept override { @@ -45,23 +44,26 @@ class ResultSchemaProvider : public meta::SchemaProviderIf { int64_t getFieldIndex(const folly::StringPiece name) const override; const char* getFieldName(int64_t index) const override; - const cpp2::ValueType& getFieldType(int64_t index) const override; - const cpp2::ValueType& getFieldType(const folly::StringPiece name) const override; + const meta::cpp2::PropertyType getFieldType(int64_t index) const override; + const meta::cpp2::PropertyType getFieldType(const folly::StringPiece name) + const override; - std::shared_ptr field(int64_t index) const override; - std::shared_ptr field( - const folly::StringPiece name) const override; + std::shared_ptr + field(int64_t index) const override; + + std::shared_ptr + field(const folly::StringPiece name) const override; protected: SchemaVer schemaVer_{0}; ColumnDefs columns_; // Map of Hash64(field_name) -> array index - UnorderedMap nameIndex_; + std::unordered_map nameIndex_; // Default constructor, only used by SchemaWriter explicit ResultSchemaProvider(SchemaVer ver = 0) : schemaVer_(ver) {} }; } // namespace nebula -#endif // DATAMAN_RESULTSCHEMAPROVIDER_H_ +#endif // CODEC_TEST_RESULTSCHEMAPROVIDER_H_ diff --git a/src/dataman/test/RowReaderBenchmark.cpp b/src/codec/test/RowReaderBenchmark.cpp similarity index 100% rename from src/dataman/test/RowReaderBenchmark.cpp rename to src/codec/test/RowReaderBenchmark.cpp diff --git a/src/dataman/test/RowReaderTest.cpp b/src/codec/test/RowReaderV1Test.cpp similarity index 52% rename from src/dataman/test/RowReaderTest.cpp rename to src/codec/test/RowReaderV1Test.cpp index e7c18ee60..1444578aa 100644 --- a/src/dataman/test/RowReaderTest.cpp +++ b/src/codec/test/RowReaderV1Test.cpp @@ -6,27 +6,26 @@ #include "base/Base.h" #include -#include "dataman/RowReader.h" -#include "dataman/SchemaWriter.h" +#include "datatypes/Value.h" +#include "codec/RowReaderV1.h" +#include "codec/test/SchemaWriter.h" namespace nebula { -TEST(RowReader, headerInfo) { +TEST(RowReaderV1, headerInfo) { // Simplest row, nothing in it char data1[] = {0x00}; auto schema1 = std::make_shared(); - auto reader1 = RowReader::getRowReader( - folly::StringPiece(data1, sizeof(data1)), - schema1); + auto reader1 = std::unique_ptr( + new RowReaderV1(folly::StringPiece(data1, sizeof(data1)), schema1)); EXPECT_EQ(0, reader1->schemaVer()); EXPECT_EQ(sizeof(data1), reader1->headerLen_); // With schema version char data2[] = {0x40, 0x01, static_cast(0xFF)}; auto schema2 = std::make_shared(0x00FF01); - auto reader2 = RowReader::getRowReader( - folly::StringPiece(data2, sizeof(data2)), - schema2); + auto reader2 = std::unique_ptr( + new RowReaderV1(folly::StringPiece(data2, sizeof(data2)), schema2)); EXPECT_EQ(0x0000FF01, reader2->schemaVer()); EXPECT_EQ(sizeof(data2), reader2->headerLen_); @@ -34,16 +33,15 @@ TEST(RowReader, headerInfo) { auto schema3 = std::make_shared(0x00FFFF01); for (int i = 0; i < 33; i++) { schema3->appendCol(folly::stringPrintf("Column%02d", i), - cpp2::SupportedType::INT); + meta::cpp2::PropertyType::INT64); } // With schema version and offsets char data3[] = {0x60, 0x01, static_cast(0xFF), static_cast(0xFF), 0x40, static_cast(0xF0)}; - auto reader3 = RowReader::getRowReader( - folly::StringPiece(data3, sizeof(data3)), - schema3); + auto reader3 = std::unique_ptr( + new RowReaderV1(folly::StringPiece(data3, sizeof(data3)), schema3)); EXPECT_EQ(0x00FFFF01, reader3->schemaVer()); EXPECT_EQ(sizeof(data3), reader3->headerLen_); ASSERT_EQ(3, reader3->blockOffsets_.size()); @@ -55,13 +53,12 @@ TEST(RowReader, headerInfo) { auto schema4 = std::make_shared(); for (int i = 0; i < 33; i++) { schema4->appendCol(folly::stringPrintf("Column%02d", i), - cpp2::SupportedType::INT); + meta::cpp2::PropertyType::INT64); } char data4[] = {0x01, static_cast(0xFF), 0x40, 0x08, static_cast(0xF0)}; - auto reader4 = RowReader::getRowReader( - folly::StringPiece(data4, sizeof(data4)), - schema4); + auto reader4 = std::unique_ptr( + new RowReaderV1(folly::StringPiece(data4, sizeof(data4)), schema4)); EXPECT_EQ(0, reader4->schemaVer()); EXPECT_EQ(sizeof(data4), reader4->headerLen_); ASSERT_EQ(3, reader4->blockOffsets_.size()); @@ -71,37 +68,37 @@ TEST(RowReader, headerInfo) { } -TEST(RowReader, encodedData) { +TEST(RowReaderV1, encodedData) { const char* colName1 = "int_col1"; const std::string colName2("int_col2"); std::string colName3("vid_col"); auto schema = std::make_shared(); // Col 0: bool_col1 -- BOOL - schema->appendCol("bool_col1", cpp2::SupportedType::BOOL); + schema->appendCol("bool_col1", meta::cpp2::PropertyType::BOOL); // Col 1: str_col1 -- STRING schema->appendCol(folly::stringPrintf("str_col1"), - cpp2::SupportedType::STRING); + meta::cpp2::PropertyType::STRING); // Col 2: int_col1 -- INT - schema->appendCol(colName1, cpp2::SupportedType::INT); + schema->appendCol(colName1, meta::cpp2::PropertyType::INT64); // Col 3: int_col2 -- INT - schema->appendCol(colName2, cpp2::SupportedType::INT); + schema->appendCol(colName2, meta::cpp2::PropertyType::INT64); // Col 4: vid_col -- VID schema->appendCol(folly::StringPiece(colName3), - cpp2::SupportedType::VID); + meta::cpp2::PropertyType::VID); // Col 5: str_col2 -- STRING - schema->appendCol("str_col2", cpp2::SupportedType::STRING); + schema->appendCol("str_col2", meta::cpp2::PropertyType::STRING); // Col 6: bool_col2 -- BOOL schema->appendCol(std::string("bool_col2"), - cpp2::SupportedType::BOOL); + meta::cpp2::PropertyType::BOOL); // Col 7: float_col -- FLOAT schema->appendCol(std::string("float_col"), - cpp2::SupportedType::FLOAT); + meta::cpp2::PropertyType::FLOAT); // Col 8: double_col -- DOUBLE schema->appendCol(std::string("double_col"), - cpp2::SupportedType::DOUBLE); + meta::cpp2::PropertyType::DOUBLE); // Col 9: timestamp_col -- TIMESTAMP - schema->appendCol("timestamp_col", cpp2::SupportedType::TIMESTAMP); + schema->appendCol("timestamp_col", meta::cpp2::PropertyType::TIMESTAMP); std::string encoded; // Single byte header (Schema version is 0, no offset) @@ -153,7 +150,7 @@ TEST(RowReader, encodedData) { /************************** * Now let's read it *************************/ - auto reader = RowReader::getRowReader(encoded, schema); + auto reader = std::unique_ptr(new RowReaderV1(encoded, schema)); // Header info EXPECT_EQ(0, reader->schemaVer()); @@ -161,118 +158,101 @@ TEST(RowReader, encodedData) { EXPECT_EQ(0, reader->blockOffsets_[0].first); EXPECT_EQ(1, reader->headerLen_); - bool bVal; - int32_t i32Val; - int64_t i64Val; - uint64_t u64Val; - folly::StringPiece sVal; - float fVal; - double dVal; + Value val; // Col 0 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getBool(0, bVal)); - EXPECT_TRUE(bVal); - bVal = false; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getBool("bool_col1", bVal)); - EXPECT_TRUE(bVal); + val = reader->getValueByIndex(0); + EXPECT_EQ(Value::Type::BOOL, val.type()); + EXPECT_TRUE(val.getBool()); + val = reader->getValueByName("bool_col1"); + EXPECT_EQ(Value::Type::BOOL, val.type()); + EXPECT_TRUE(val.getBool()); // Col 1 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getString(1, sVal)); - EXPECT_EQ(str1, sVal.toString()); - sVal.clear(); - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getString("str_col1", sVal)); - EXPECT_EQ(str1, sVal.toString()); + val = reader->getValueByIndex(1); + EXPECT_EQ(Value::Type::STRING, val.type()); + EXPECT_EQ(str1, val.getStr()); + val = reader->getValueByName("str_col1"); + EXPECT_EQ(Value::Type::STRING, val.type()); + EXPECT_EQ(str1, val.getStr()); // Col 2 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getInt(2, i32Val)); - EXPECT_EQ(100, i32Val); - i32Val = 0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getInt("int_col1", i32Val)); - EXPECT_EQ(100, i32Val); + val = reader->getValueByIndex(2); + EXPECT_EQ(Value::Type::INT, val.type()); + EXPECT_EQ(100, val.getInt()); + val = reader->getValueByName("int_col1"); + EXPECT_EQ(Value::Type::INT, val.type()); + EXPECT_EQ(100, val.getInt()); // Col 3 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getInt(3, i32Val)); - EXPECT_EQ(0xFFFFFFFF, i32Val); - i32Val = 0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getInt(3, i64Val)); - EXPECT_EQ(0xFFFFFFFFFFFFFFFFL, i64Val); - i64Val = 0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getInt("int_col2", u64Val)); - EXPECT_EQ(0xFFFFFFFFFFFFFFFFL, u64Val); + val = reader->getValueByIndex(3); + EXPECT_EQ(Value::Type::INT, val.type()); + EXPECT_EQ(0xFFFFFFFFFFFFFFFFL, val.getInt()); + val = reader->getValueByName("int_col2"); + EXPECT_EQ(Value::Type::INT, val.type()); + EXPECT_EQ(0xFFFFFFFFFFFFFFFFL, val.getInt()); // Col 4 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getVid(4, i64Val)); - EXPECT_EQ(0x8877665544332211L, i64Val); - i64Val = 0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getVid("vid_col", i64Val)); - EXPECT_EQ(0x8877665544332211L, i64Val); + val = reader->getValueByIndex(4); + EXPECT_EQ(Value::Type::STRING, val.type()); + EXPECT_EQ(sizeof(int64_t), val.getStr().size()); + int64_t vid = 0; + memcpy(reinterpret_cast(&vid), val.getStr().data(), sizeof(int64_t)); + EXPECT_EQ(0x8877665544332211L, vid); + val = reader->getValueByName("vid_col"); + EXPECT_EQ(Value::Type::STRING, val.type()); + EXPECT_EQ(sizeof(int64_t), val.getStr().size()); + vid = 0; + memcpy(reinterpret_cast(&vid), val.getStr().data(), sizeof(int64_t)); + EXPECT_EQ(0x8877665544332211L, vid); // Col 5 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getString(5, sVal)); - EXPECT_EQ(str2, sVal.toString()); - sVal.clear(); - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getString("str_col2", sVal)); - EXPECT_EQ(str2, sVal.toString()); + val = reader->getValueByIndex(5); + EXPECT_EQ(Value::Type::STRING, val.type()); + EXPECT_EQ(str2, val.getStr()); + val = reader->getValueByName("str_col2"); + EXPECT_EQ(Value::Type::STRING, val.type()); + EXPECT_EQ(str2, val.getStr()); // Col 6 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getBool(6, bVal)); - EXPECT_FALSE(bVal); - bVal = true; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getBool("bool_col2", bVal)); - EXPECT_FALSE(bVal); + val = reader->getValueByIndex(6); + EXPECT_EQ(Value::Type::BOOL, val.type()); + EXPECT_FALSE(val.getBool()); + val = reader->getValueByName("bool_col2"); + EXPECT_EQ(Value::Type::BOOL, val.type()); + EXPECT_FALSE(val.getBool()); // Col 7 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getFloat(7, fVal)); - EXPECT_FLOAT_EQ(pi, fVal); - fVal = 0.0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getFloat("float_col", fVal)); - EXPECT_FLOAT_EQ(pi, fVal); + val = reader->getValueByIndex(7); + EXPECT_EQ(Value::Type::FLOAT, val.type()); + EXPECT_DOUBLE_EQ(pi, val.getFloat()); + val = reader->getValueByName("float_col"); + EXPECT_EQ(Value::Type::FLOAT, val.type()); + EXPECT_DOUBLE_EQ(pi, val.getFloat()); // Col 8 - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getDouble(8, dVal)); - EXPECT_DOUBLE_EQ(e, dVal); - dVal = 0.0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getDouble("double_col", dVal)); - EXPECT_DOUBLE_EQ(e, dVal); + val = reader->getValueByIndex(8); + EXPECT_EQ(Value::Type::FLOAT, val.type()); + EXPECT_DOUBLE_EQ(e, val.getFloat()); + val = reader->getValueByName("double_col"); + EXPECT_EQ(Value::Type::FLOAT, val.type()); + EXPECT_DOUBLE_EQ(e, val.getFloat()); // Col 9 - i64Val = 0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getInt(9, i64Val)); - EXPECT_EQ(1551331827, i64Val); - i64Val = 0; - EXPECT_EQ(ResultType::SUCCEEDED, - reader->getInt("timestamp_col", i64Val)); - EXPECT_EQ(1551331827, i64Val); + val = reader->getValueByIndex(9); + EXPECT_EQ(Value::Type::INT, val.type()); + EXPECT_EQ(1551331827, val.getInt()); + val = reader->getValueByName("timestamp_col"); + EXPECT_EQ(Value::Type::INT, val.type()); + EXPECT_EQ(1551331827, val.getInt()); // Col 10 -- non-existing column - EXPECT_EQ(ResultType::E_INDEX_OUT_OF_RANGE, - reader->getBool(10, bVal)); - EXPECT_EQ(ResultType::E_NAME_NOT_FOUND, - reader->getBool("bool_col3", bVal)); + val = reader->getValueByIndex(10); + EXPECT_EQ(Value::Type::NULLVALUE, val.type()); } -TEST(RowReader, iterator) { +TEST(RowReaderV1, iterator) { std::string encoded; encoded.append(1, 0); encoded.append(1, 16); @@ -283,23 +263,22 @@ TEST(RowReader, iterator) { auto schema = std::make_shared(); for (int i = 0; i < 64; i++) { schema->appendCol(folly::stringPrintf("Col%02d", i), - cpp2::SupportedType::INT); + meta::cpp2::PropertyType::INT64); encoded.append(1, i + 1); } - auto reader = RowReader::getRowReader(encoded, schema); + auto reader = std::unique_ptr(new RowReaderV1(encoded, schema)); auto it = reader->begin(); - int32_t v1; - int32_t v2; - for (int i = 0; i < 64; i++) { - EXPECT_EQ(ResultType::SUCCEEDED, reader->getInt(i, v1)); - EXPECT_EQ(ResultType::SUCCEEDED, it->getInt(v2)); - EXPECT_EQ(v1, v2); + int32_t index = 0; + while (it != reader->end()) { + Value v = reader->getValueByIndex(index); + EXPECT_EQ(Value::Type::INT, v.type()); + EXPECT_EQ(v, it->value()); ++it; + ++index; } - EXPECT_FALSE((bool)it); - EXPECT_EQ(it, reader->end()); + EXPECT_EQ(64, index); } } // namespace nebula diff --git a/src/dataman/test/RowSetReaderWriterTest.cpp b/src/codec/test/RowSetReaderWriterTest.cpp similarity index 100% rename from src/dataman/test/RowSetReaderWriterTest.cpp rename to src/codec/test/RowSetReaderWriterTest.cpp diff --git a/src/dataman/test/RowUpdaterTest.cpp b/src/codec/test/RowUpdaterTest.cpp similarity index 100% rename from src/dataman/test/RowUpdaterTest.cpp rename to src/codec/test/RowUpdaterTest.cpp diff --git a/src/dataman/test/RowWriterBenchmark.cpp b/src/codec/test/RowWriterBenchmark.cpp similarity index 100% rename from src/dataman/test/RowWriterBenchmark.cpp rename to src/codec/test/RowWriterBenchmark.cpp diff --git a/src/dataman/test/RowWriterTest.cpp b/src/codec/test/RowWriterTest.cpp similarity index 100% rename from src/dataman/test/RowWriterTest.cpp rename to src/codec/test/RowWriterTest.cpp diff --git a/src/dataman/RowWriter.cpp b/src/codec/test/RowWriterV1.cpp similarity index 100% rename from src/dataman/RowWriter.cpp rename to src/codec/test/RowWriterV1.cpp diff --git a/src/dataman/RowWriter.h b/src/codec/test/RowWriterV1.h similarity index 100% rename from src/dataman/RowWriter.h rename to src/codec/test/RowWriterV1.h diff --git a/src/dataman/RowWriter.inl b/src/codec/test/RowWriterV1.inl similarity index 100% rename from src/dataman/RowWriter.inl rename to src/codec/test/RowWriterV1.inl diff --git a/src/dataman/SchemaWriter.cpp b/src/codec/test/SchemaWriter.cpp similarity index 65% rename from src/dataman/SchemaWriter.cpp rename to src/codec/test/SchemaWriter.cpp index d9956e3f4..e9a1ba5c7 100644 --- a/src/dataman/SchemaWriter.cpp +++ b/src/codec/test/SchemaWriter.cpp @@ -5,14 +5,13 @@ */ #include "base/Base.h" -#include "dataman/SchemaWriter.h" +#include "codec/test/SchemaWriter.h" namespace nebula { -using cpp2::Schema; -using cpp2::ValueType; -using cpp2::SupportedType; -using cpp2::ColumnDef; +using meta::cpp2::Schema; +using meta::cpp2::PropertyType; +using meta::cpp2::ColumnDef; Schema SchemaWriter::moveSchema() noexcept { Schema schema; @@ -24,23 +23,14 @@ Schema SchemaWriter::moveSchema() noexcept { SchemaWriter& SchemaWriter::appendCol(folly::StringPiece name, - SupportedType type) noexcept { - ValueType vt; - vt.set_type(type); - - return appendCol(name, std::move(vt)); -} - - -SchemaWriter& SchemaWriter::appendCol(folly::StringPiece name, - ValueType&& type) noexcept { + PropertyType type) noexcept { using folly::hash::SpookyHashV2; uint64_t hash = SpookyHashV2::Hash64(name.data(), name.size(), 0); DCHECK(nameIndex_.find(hash) == nameIndex_.end()); ColumnDef col; col.set_name(name.toString()); - col.set_type(std::move(type)); + col.set_type(type); columns_.emplace_back(std::move(col)); nameIndex_.emplace(std::make_pair(hash, columns_.size() - 1)); diff --git a/src/dataman/SchemaWriter.h b/src/codec/test/SchemaWriter.h similarity index 58% rename from src/dataman/SchemaWriter.h rename to src/codec/test/SchemaWriter.h index fa5774bc9..5c455b337 100644 --- a/src/dataman/SchemaWriter.h +++ b/src/codec/test/SchemaWriter.h @@ -4,11 +4,11 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#ifndef DATAMAN_SCHEMAWRITER_H_ -#define DATAMAN_SCHEMAWRITER_H_ +#ifndef CODEC_TEST_SCHEMAWRITER_H_ +#define CODEC_TEST_SCHEMAWRITER_H_ #include "base/Base.h" -#include "dataman/ResultSchemaProvider.h" +#include "codec/test/ResultSchemaProvider.h" namespace nebula { @@ -17,17 +17,14 @@ class SchemaWriter : public ResultSchemaProvider { explicit SchemaWriter(SchemaVer ver = 0) : ResultSchemaProvider(ver) {} // Move the schema out of the writer - cpp2::Schema moveSchema() noexcept; + meta::cpp2::Schema moveSchema() noexcept; SchemaWriter& appendCol(folly::StringPiece name, - cpp2::SupportedType type) noexcept; - - SchemaWriter& appendCol(folly::StringPiece name, - cpp2::ValueType&& type)noexcept; + meta::cpp2::PropertyType type) noexcept; private: }; } // namespace nebula -#endif // DATAMAN_SCHEMAWRITER_H_ +#endif // CODEC_TEST_SCHEMAWRITER_H_ diff --git a/src/dataman/CMakeLists.txt b/src/dataman/CMakeLists.txt deleted file mode 100644 index f4445b89b..000000000 --- a/src/dataman/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -nebula_add_library( - dataman_obj OBJECT - ResultSchemaProvider.cpp - SchemaWriter.cpp - RowSetReader.cpp - RowSetWriter.cpp - RowReader.cpp - RowUpdater.cpp - RowWriter.cpp - NebulaCodecImpl.cpp -) - -nebula_add_subdirectory(test) diff --git a/src/dataman/DataCommon.h b/src/dataman/DataCommon.h deleted file mode 100644 index 0179ea97f..000000000 --- a/src/dataman/DataCommon.h +++ /dev/null @@ -1,52 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#ifndef DATAMAN_DATACOMMON_H_ -#define DATAMAN_DATACOMMON_H_ - -#include "base/Base.h" - -namespace nebula { - -enum class ResultType { - SUCCEEDED = 0, - E_NAME_NOT_FOUND = -1, - E_INDEX_OUT_OF_RANGE = -2, - E_INCOMPATIBLE_TYPE = -3, - E_VALUE_OUT_OF_RANGE = -4, - E_DATA_INVALID = -5, -}; - - -using FieldValue = boost::variant; -#define VALUE_TYPE_BOOL 0 -#define VALUE_TYPE_INT 1 -#define VALUE_TYPE_FLOAT 2 -#define VALUE_TYPE_DOUBLE 3 -#define VALUE_TYPE_STRING 4 - -template -typename std::enable_if< - std::is_integral< - typename std::remove_cv< - typename std::remove_reference::type - >::type - >::value, - bool ->::type -intToBool(IntType iVal) { - return iVal != 0; -} - -inline bool strToBool(folly::StringPiece str) { - return str == "Y" || str == "y" || str == "T" || str == "t" || - str == "yes" || str == "Yes" || str == "YES" || - str == "true" || str == "True" || str == "TRUE"; -} - -} // namespace nebula -#endif // DATAMAN_DATACOMMON_H_ - diff --git a/src/dataman/RowReader.cpp b/src/dataman/RowReader.cpp deleted file mode 100644 index 7a6dcea68..000000000 --- a/src/dataman/RowReader.cpp +++ /dev/null @@ -1,635 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#include "base/Base.h" -#include "dataman/RowReader.h" - -namespace nebula { - -using nebula::meta::SchemaManager; - -/********************************************* - * - * class RowReader::Cell - * - ********************************************/ -ResultType RowReader::Cell::getBool(bool& v) const noexcept { - RR_CELL_GET_VALUE(Bool); -} - - -ResultType RowReader::Cell::getFloat(float& v) const noexcept { - RR_CELL_GET_VALUE(Float); -} - - -ResultType RowReader::Cell::getDouble(double& v) const noexcept { - RR_CELL_GET_VALUE(Double); -} - - -ResultType RowReader::Cell::getString(folly::StringPiece& v) const noexcept { - RR_CELL_GET_VALUE(String); -} - - -ResultType RowReader::Cell::getVid(int64_t& v) const noexcept { - RR_CELL_GET_VALUE(Vid); -} - - -/********************************************* - * - * class RowReader::Iterator - * - ********************************************/ -RowReader::Iterator::Iterator(const RowReader* reader, - size_t numFields, - int64_t index) - : reader_(reader) - , numFields_(numFields) - , index_(index) { - cell_.reset(new Cell(reader_, this)); -} - - -RowReader::Iterator::Iterator(Iterator&& iter) - : reader_(iter.reader_) - , numFields_(iter.numFields_) - , cell_(std::move(iter.cell_)) - , index_(iter.index_) - , bytes_(iter.bytes_) - , offset_(iter.offset_) {} - - -const RowReader::Cell& RowReader::Iterator::operator*() const { - return *cell_; -} - - -const RowReader::Cell* RowReader::Iterator::operator->() const { - return cell_.get(); -} - - -RowReader::Iterator& RowReader::Iterator::operator++() { - if (*this) { - if (bytes_ > 0) { - offset_ += bytes_; - } else { - offset_ = reader_->skipToNext(index_, offset_); - if (offset_ < 0) { - // Something is wrong - index_ = numFields_; - return *this; - } - } - - bytes_ = 0; - index_++; - } - - return *this; -} - - -bool RowReader::Iterator::operator==(const Iterator& rhs) const noexcept { - return reader_ == rhs.reader_ && - numFields_ == rhs.numFields_ && - index_ == rhs.index_; -} - - -RowReader::Iterator::operator bool() const { - return index_ != static_cast(numFields_); -} - - -/********************************************* - * - * class RowReader - * - ********************************************/ -// static -std::unique_ptr RowReader::getTagPropReader( - meta::SchemaManager* schemaMan, - folly::StringPiece row, - GraphSpaceID space, - TagID tag) { - CHECK_NOTNULL(schemaMan); - int32_t ver = getSchemaVer(row); - if (ver >= 0) { - return std::unique_ptr(new RowReader( - row, - schemaMan->getTagSchema(space, tag, ver))); - } else { - // Invalid data - // TODO We need a better error handler here - LOG(FATAL) << "Invalid schema version in the row data!"; - return nullptr; - } -} - - -// static -std::unique_ptr RowReader::getEdgePropReader( - meta::SchemaManager* schemaMan, - folly::StringPiece row, - GraphSpaceID space, - EdgeType edge) { - CHECK_NOTNULL(schemaMan); - int32_t ver = getSchemaVer(row); - if (ver >= 0) { - return std::unique_ptr(new RowReader( - row, - schemaMan->getEdgeSchema(space, edge, ver))); - } else { - // Invalid data - // TODO We need a better error handler here - LOG(FATAL) << "Invalid schema version in the row data!"; - return nullptr; - } -} - - -// static -std::unique_ptr RowReader::getRowReader( - folly::StringPiece row, - std::shared_ptr schema) { - SchemaVer ver = getSchemaVer(row); - CHECK_EQ(ver, schema->getVersion()); - return std::unique_ptr(new RowReader(row, std::move(schema))); -} - - -// static -int32_t RowReader::getSchemaVer(folly::StringPiece row) { - const uint8_t* it = reinterpret_cast(row.begin()); - if (reinterpret_cast(it) == row.end()) { - LOG(ERROR) << "Row data is empty, so there is no schema version"; - return 0; - } - - // The first three bits indicate the number of bytes for the - // schema version. If the number is zero, no schema version - // presents - size_t verBytes = *(it++) >> 5; - int32_t ver = 0; - if (verBytes > 0) { - if (verBytes + 1 > row.size()) { - // Data is too short - LOG(ERROR) << "Row data is too short"; - return 0; - } - // Schema Version is stored in Little Endian - for (size_t i = 0; i < verBytes; i++) { - ver |= (uint32_t(*(it++)) << (8 * i)); - } - } - - return ver; -} - - -RowReader::RowReader(folly::StringPiece row, - std::shared_ptr schema) - : schema_{std::move(schema)} { - CHECK(!!schema_) << "A schema must be provided"; - - if (processHeader(row)) { - // data_.begin() points to the first field - data_.reset(row.begin() + headerLen_, row.size() - headerLen_); - } else { - // Invalid data - // TODO We need a better error handler here - LOG(FATAL) << "Invalid row data!"; - } -} - - -bool RowReader::processHeader(folly::StringPiece row) { - const uint8_t* it = reinterpret_cast(row.begin()); - if (reinterpret_cast(it) == row.end()) { - return false; - } - - DCHECK(!!schema_) << "A schema must be provided"; - - // The last three bits indicate the number of bytes for offsets - // The first three bits indicate the number of bytes for the - // schena version. If the number is zero, no schema version - // presents - numBytesForOffset_ = (*it & 0x07) + 1; - int32_t verBytes = *(it++) >> 5; - it += verBytes; - - // Process the block offsets - // Block offsets point to the start of every 16 fields, except the - // first 16 fields - // Block offsets are stored in Little Endian - uint32_t numFields = schema_->getNumFields(); - uint32_t numOffsets = (numFields >> 4); - if (numBytesForOffset_ * numOffsets + verBytes + 1 > row.size()) { - // Data is too short - LOG(ERROR) << "Row data is too short"; - return false; - } - offsets_.resize(numFields + 1, -1); - offsets_[0] = 0; - blockOffsets_.emplace_back(0, 0); - blockOffsets_.reserve(numOffsets); - for (uint32_t i = 0; i < numOffsets; i++) { - int64_t offset = 0; - for (int32_t j = 0; j < numBytesForOffset_; j++) { - offset |= (uint64_t(*(it++)) << (8 * j)); - } - blockOffsets_.emplace_back(offset, 0); - offsets_[16 * (i + 1)] = offset; - } - // Now done with the header - - headerLen_ = reinterpret_cast(it) - row.begin(); - offsets_[numFields] = row.size() - headerLen_; - - return true; -} - - -int32_t RowReader::numFields() const noexcept { - return schema_->getNumFields(); -} - - -SchemaVer RowReader::schemaVer() const noexcept { - return schema_->getVersion(); -} - - -int64_t RowReader::skipToNext(int64_t index, int64_t offset) const noexcept { - const cpp2::ValueType& vType = schema_->getFieldType(index); - CHECK(vType != CommonConstants::kInvalidValueType()) - << "No schema for the index " << index; - if (offsets_[index + 1] >= 0) { - return offsets_[index + 1]; - } - - switch (vType.get_type()) { - case cpp2::SupportedType::BOOL: { - // One byte - offset++; - break; - } - case cpp2::SupportedType::INT: - case cpp2::SupportedType::TIMESTAMP: { - int64_t v; - int32_t len = readInteger(offset, v); - if (len <= 0) { - return static_cast(ResultType::E_DATA_INVALID); - } - offset += len; - break; - } - case cpp2::SupportedType::FLOAT: { - // Four bytes - offset += sizeof(float); - break; - } - case cpp2::SupportedType::DOUBLE: { - // Eight bytes - offset += sizeof(double); - break; - } - case cpp2::SupportedType::STRING: { - int64_t strLen; - int32_t intLen = readInteger(offset, strLen); - if (intLen <= 0) { - return static_cast(ResultType::E_DATA_INVALID); - } - offset += intLen + strLen; - break; - } - case cpp2::SupportedType::VID: { - // Eight bytes - offset += sizeof(int64_t); - break; - } - default: { - // TODO - LOG(FATAL) << "Unimplemented"; - } - } - - if (offset > static_cast(data_.size())) { - return static_cast(ResultType::E_DATA_INVALID); - } - - // Update offsets - offsets_[index + 1] = offset; - // Update block offsets - int32_t base = (index + 1) >> 4; - blockOffsets_[base].second = ((index + 1) & 0x0F); - - return offset; -} - - -int64_t RowReader::skipToField(int64_t index) const noexcept { - DCHECK_GE(index, 0); - if (index >= static_cast(schema_->getNumFields())) { - // Index is out of range - return static_cast(ResultType::E_INDEX_OUT_OF_RANGE); - } - - int64_t base = index >> 4; - const auto& blockOffset = blockOffsets_[base]; - base <<= 4; - int64_t maxVisitedIndex = base + blockOffset.second; - if (index <= maxVisitedIndex) { - return offsets_[index]; - } - - int64_t offset = offsets_[maxVisitedIndex]; - for (int64_t i = maxVisitedIndex; i < base + (index & 0x0000000f); i++) { - offset = skipToNext(i, offset); - if (offset < 0) { - return static_cast(ResultType::E_DATA_INVALID); - } - } - - return offset; -} - - -int32_t RowReader::readFloat(int64_t offset, float& v) const noexcept { - if (offset + sizeof(float) > data_.size()) { - return static_cast(ResultType::E_DATA_INVALID); - } - - memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(float)); - - return sizeof(float); -} - - -int32_t RowReader::readDouble(int64_t offset, double& v) const noexcept { - if (offset + sizeof(double) > data_.size()) { - return static_cast(ResultType::E_DATA_INVALID); - } - - memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(double)); - - return sizeof(double); -} - - -int32_t RowReader::readString(int64_t offset, folly::StringPiece& v) - const noexcept { - int64_t strLen; - int32_t intLen = readInteger(offset, strLen); - CHECK_GT(intLen, 0) << "Invalid string length"; - if (offset + intLen + strLen > static_cast(data_.size())) { - return static_cast(ResultType::E_DATA_INVALID); - } - - v = data_.subpiece(offset + intLen, strLen); - return intLen + strLen; -} - - -int32_t RowReader::readInt64(int64_t offset, int64_t& v) const noexcept { - if (offset + sizeof(int64_t) > data_.size()) { - return static_cast(ResultType::E_DATA_INVALID); - } - - // VID is stored in Little Endian - memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(int64_t)); - - return sizeof(int64_t); -} - - -int32_t RowReader::readVid(int64_t offset, int64_t& v) const noexcept { - return readInt64(offset, v); -} - - -ResultType RowReader::getBool(int64_t index, int64_t& offset, bool& v) - const noexcept { - switch (schema_->getFieldType(index).get_type()) { - case cpp2::SupportedType::BOOL: { - v = intToBool(data_[offset]); - offset++; - break; - } - case cpp2::SupportedType::INT: - case cpp2::SupportedType::TIMESTAMP: { - int64_t intV; - int32_t numBytes = readInteger(offset, intV); - if (numBytes > 0) { - v = intToBool(intV); - offset += numBytes; - } else { - return static_cast(numBytes); - } - break; - } - case cpp2::SupportedType::STRING: { - folly::StringPiece strV; - int32_t numBytes = readString(offset, strV); - if (numBytes > 0) { - v = strToBool(strV); - offset += numBytes; - } else { - return static_cast(numBytes); - } - break; - } - default: { - return ResultType::E_INCOMPATIBLE_TYPE; - } - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowReader::getFloat(int64_t index, int64_t& offset, float& v) - const noexcept { - switch (schema_->getFieldType(index).get_type()) { - case cpp2::SupportedType::FLOAT: { - int32_t numBytes = readFloat(offset, v); - if (numBytes < 0) { - return static_cast(numBytes); - } - offset += numBytes; - break; - } - case cpp2::SupportedType::DOUBLE: { - double d; - int32_t numBytes = readDouble(offset, d); - if (numBytes < 0) { - return static_cast(numBytes); - } - v = static_cast(d); - offset += numBytes; - break; - } - default: { - return ResultType::E_INCOMPATIBLE_TYPE; - } - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowReader::getDouble(int64_t index, int64_t& offset, double& v) - const noexcept { - switch (schema_->getFieldType(index).get_type()) { - case cpp2::SupportedType::FLOAT: { - float f; - int32_t numBytes = readFloat(offset, f); - if (numBytes < 0) { - return static_cast(numBytes); - } - v = static_cast(f); - offset += numBytes; - break; - } - case cpp2::SupportedType::DOUBLE: { - int32_t numBytes = readDouble(offset, v); - if (numBytes < 0) { - return static_cast(numBytes); - } - offset += numBytes; - break; - } - default: { - return ResultType::E_INCOMPATIBLE_TYPE; - } - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowReader::getString(int64_t index, - int64_t& offset, - folly::StringPiece& v) const noexcept { - switch (schema_->getFieldType(index).get_type()) { - case cpp2::SupportedType::STRING: { - int32_t numBytes = readString(offset, v); - if (numBytes < 0) { - return static_cast(numBytes); - } - offset += numBytes; - break; - } - default: { - return ResultType::E_INCOMPATIBLE_TYPE; - } - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowReader::getInt64(int64_t index, int64_t& offset, int64_t& v) - const noexcept { - switch (schema_->getFieldType(index).get_type()) { - case cpp2::SupportedType::INT: - case cpp2::SupportedType::TIMESTAMP: { - int32_t numBytes = readInteger(offset, v); - if (numBytes < 0) { - return static_cast(numBytes); - } - offset += numBytes; - break; - } - case cpp2::SupportedType::VID: { - int32_t numBytes = readVid(offset, v); - if (numBytes < 0) { - return static_cast(numBytes); - } - offset += numBytes; - break; - } - default: { - return ResultType::E_INCOMPATIBLE_TYPE; - } - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowReader::getVid(int64_t index, int64_t& offset, int64_t& v) - const noexcept { - auto fieldType = schema_->getFieldType(index).get_type(); - if (fieldType == cpp2::SupportedType::INT || fieldType == cpp2::SupportedType::VID) - return getInt64(index, offset, v); - else - return ResultType::E_INCOMPATIBLE_TYPE; -} - - -RowReader::Iterator RowReader::begin() const noexcept { - return Iterator(this, schema_->getNumFields(), 0); -} - - -RowReader::Iterator RowReader::end() const noexcept { - auto numFields = schema_->getNumFields(); - return Iterator(this, numFields, numFields); -} - - -/*************************************************** - * - * Field Accessors - * - **************************************************/ -ResultType RR_GET_VALUE_BY_NAME(Bool, bool) - -ResultType RowReader::getBool(int64_t index, bool& v) const noexcept { - RR_GET_OFFSET() - return getBool(index, offset, v); -} - - -ResultType RR_GET_VALUE_BY_NAME(Float, float) - -ResultType RowReader::getFloat(int64_t index, float& v) const noexcept { - RR_GET_OFFSET() - return getFloat(index, offset, v); -} - - -ResultType RR_GET_VALUE_BY_NAME(Double, double) - -ResultType RowReader::getDouble(int64_t index, double& v) const noexcept { - RR_GET_OFFSET() - return getDouble(index, offset, v); -} - - -ResultType RR_GET_VALUE_BY_NAME(String, folly::StringPiece) - -ResultType RowReader::getString(int64_t index, folly::StringPiece& v) - const noexcept { - RR_GET_OFFSET() - return getString(index, offset, v); -} - - -ResultType RR_GET_VALUE_BY_NAME(Vid, int64_t) - -ResultType RowReader::getVid(int64_t index, int64_t& v) const noexcept { - RR_GET_OFFSET() - return getVid(index, offset, v); -} - -} // namespace nebula diff --git a/src/dataman/RowReader.h b/src/dataman/RowReader.h deleted file mode 100644 index ebbf5b349..000000000 --- a/src/dataman/RowReader.h +++ /dev/null @@ -1,391 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#ifndef DATAMAN_ROWREADER_H_ -#define DATAMAN_ROWREADER_H_ - -#include "base/Base.h" -#include -#include "gen-cpp2/graph_types.h" -#include "interface/gen-cpp2/common_types.h" -#include "dataman/DataCommon.h" -#include "meta/SchemaProviderIf.h" -#include "meta/SchemaManager.h" -#include "base/ErrorOr.h" - -namespace nebula { - -/** - * This class decodes one row of data - */ -class RowReader { - FRIEND_TEST(RowReader, headerInfo); - FRIEND_TEST(RowReader, encodedData); - FRIEND_TEST(RowWriter, offsetsCreation); - -public: - class Iterator; - - class Cell final { - friend class Iterator; - public: - template - typename std::enable_if::value, ResultType>::type - getInt(T& v) const noexcept; - ResultType getBool(bool& v) const noexcept; - ResultType getFloat(float& v) const noexcept; - ResultType getDouble(double& v) const noexcept; - ResultType getString(folly::StringPiece& v) const noexcept; - ResultType getVid(int64_t& v) const noexcept; - private: - const RowReader* reader_; - Iterator* iter_; - - Cell(const RowReader* reader, Iterator* iter) - : reader_(reader), iter_(iter) {} - }; - friend class Cell; - - - class Iterator final { - friend class RowReader; - public: - Iterator(Iterator&& iter); - const Cell& operator*() const; - const Cell* operator->() const; - Iterator& operator++(); - operator bool() const; - bool operator==(const Iterator& rhs) const noexcept; - private: - const RowReader* reader_; - const size_t numFields_; - std::unique_ptr cell_; - int64_t index_ = 0; - int32_t bytes_ = 0; - int64_t offset_ = 0; - - Iterator(const RowReader* reader, size_t numFields, int64_t index = 0); - }; - - -public: - static std::unique_ptr getTagPropReader( - meta::SchemaManager* schemaMan, - folly::StringPiece row, - GraphSpaceID space, - TagID tag); - - static std::unique_ptr getEdgePropReader( - meta::SchemaManager* schemaMan, - folly::StringPiece row, - GraphSpaceID space, - EdgeType edge); - - static std::unique_ptr getRowReader( - folly::StringPiece row, - std::shared_ptr schema); - - static StatusOr getDefaultProp(const meta::SchemaProviderIf* schema, - const std::string& prop) { - auto& vType = schema->getFieldType(prop); - switch (vType.type) { - case nebula::cpp2::SupportedType::BOOL: { - return false; - } - case nebula::cpp2::SupportedType::TIMESTAMP: - case nebula::cpp2::SupportedType::INT: - return static_cast(0); - case nebula::cpp2::SupportedType::VID: { - return static_cast(0); - } - case nebula::cpp2::SupportedType::FLOAT: - case nebula::cpp2::SupportedType::DOUBLE: { - return static_cast(0.0); - } - case nebula::cpp2::SupportedType::STRING: { - return static_cast(""); - } - default: - auto msg = folly::sformat("Unknown type: {}", static_cast(vType.type)); - LOG(ERROR) << "Unknown type: " << msg; - return Status::Error(msg); - } - } - - static ErrorOr getPropByName(const RowReader* reader, - const std::string& prop) { - auto& vType = reader->getSchema()->getFieldType(prop); - switch (vType.type) { - case nebula::cpp2::SupportedType::BOOL: { - bool v; - auto ret = reader->getBool(prop, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::INT: - case nebula::cpp2::SupportedType::TIMESTAMP: { - int64_t v; - auto ret = reader->getInt(prop, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::VID: { - VertexID v; - auto ret = reader->getVid(prop, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::FLOAT: { - float v; - auto ret = reader->getFloat(prop, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return static_cast(v); - } - case nebula::cpp2::SupportedType::DOUBLE: { - double v; - auto ret = reader->getDouble(prop, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::STRING: { - folly::StringPiece v; - auto ret = reader->getString(prop, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v.toString(); - } - default: - LOG(FATAL) << "Unknown type: " << static_cast(vType.type); - return ResultType::E_DATA_INVALID; - } - } - - - static ErrorOr getPropByIndex(const RowReader *reader, - const int64_t index) { - auto& vType = reader->getSchema()->getFieldType(index); - switch (vType.get_type()) { - case nebula::cpp2::SupportedType::BOOL: { - bool v; - auto ret = reader->getBool(index, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::INT: - case nebula::cpp2::SupportedType::TIMESTAMP: { - int64_t v; - auto ret = reader->getInt(index, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::VID: { - VertexID v; - auto ret = reader->getVid(index, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::FLOAT: { - float v; - auto ret = reader->getFloat(index, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return static_cast(v); - } - case nebula::cpp2::SupportedType::DOUBLE: { - double v; - auto ret = reader->getDouble(index, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v; - } - case nebula::cpp2::SupportedType::STRING: { - folly::StringPiece v; - auto ret = reader->getString(index, v); - if (ret != ResultType::SUCCEEDED) { - return ret; - } - return v.toString(); - } - default: - LOG(FATAL) << "Unknown type: " << static_cast(vType.get_type()); - return ResultType::E_DATA_INVALID; - } - } - - virtual ~RowReader() = default; - - SchemaVer schemaVer() const noexcept; - int32_t numFields() const noexcept; - - Iterator begin() const noexcept; - Iterator end() const noexcept; - - ResultType getBool(const folly::StringPiece name, bool& v) const noexcept; - ResultType getBool(int64_t index, bool& v) const noexcept; - - template - typename std::enable_if::value, ResultType>::type - getInt(const folly::StringPiece name, T& v) const noexcept; - - template - typename std::enable_if::value, ResultType>::type - getInt(int64_t index, T& v) const noexcept; - - ResultType getFloat(const folly::StringPiece name, float& v) const noexcept; - ResultType getFloat(int64_t index, float& v) const noexcept; - - ResultType getDouble(const folly::StringPiece name, double& v) const noexcept; - ResultType getDouble(int64_t index, double& v) const noexcept; - - ResultType getString(const folly::StringPiece name, - folly::StringPiece& v) const noexcept; - ResultType getString(int64_t index, - folly::StringPiece& v) const noexcept; - - ResultType getVid(const folly::StringPiece name, int64_t& v) const noexcept; - ResultType getVid(int64_t index, int64_t& v) const noexcept; - - - std::shared_ptr getSchema() const { - return schema_; - } - - // TODO getPath(const std::string& name) const noexcept; - // TODO getPath(int64_t index) const noexcept; - // TODO getList(const std::string& name) const noexcept; - // TODO getList(int64_t index) const noexcept; - // TODO getSet(const std::string& name) const noexcept; - // TODO getSet(int64_t index) const noexcept; - // TODO getMap(const std::string& name) const noexcept; - // TODO getMap(int64_t index) const noexcept; - -private: - std::shared_ptr schema_; - - folly::StringPiece data_; - int32_t headerLen_ = 0; - int32_t numBytesForOffset_ = 0; - // Block offet value is composed by two integers. The first one is - // the block offset, the second one is the largest index being visited - // in the block. This index is zero-based - mutable std::vector> blockOffsets_; - mutable std::vector offsets_; - -private: - static int32_t getSchemaVer(folly::StringPiece row); - - RowReader(folly::StringPiece row, - std::shared_ptr schema); - - // Process the row header infomation - // Returns false when the row data is invalid - bool processHeader(folly::StringPiece row); - - // Process the block offsets (each block contains certain number of fields) - // Returns false when the row data is invalid - bool processBlockOffsets(folly::StringPiece row, int32_t verBytes); - - // Skip to the next field - // Parameter: - // index : the current field index - // offset : the current offset - // When succeeded, the method returns the offset pointing to the - // next field - // When failed, the method returns a negative number - int64_t skipToNext(int64_t index, int64_t offset) const noexcept; - - // Skip to the {index}Th field - // The method retuns the offset of the field - // It returns a negative number when the data corrupts - int64_t skipToField(int64_t index) const noexcept; - - // The following methods all return the number of bytes read - // A negative number will be returned if an error occurs - template - typename std::enable_if::value, int32_t>::type - readInteger(int64_t offset, T& v) const noexcept; - int32_t readFloat(int64_t offset, float& v) const noexcept; - int32_t readDouble(int64_t offset, double& v) const noexcept; - int32_t readString(int64_t offset, folly::StringPiece& v) const noexcept; - int32_t readInt64(int64_t offset, int64_t& v) const noexcept; - int32_t readVid(int64_t offset, int64_t& v) const noexcept; - - // Following methods assume the parameters index and offset are valid - // When succeeded, offset will advance - template - typename std::enable_if::value, ResultType>::type - getInt(int64_t index, int64_t& offset, T& v) const noexcept; - ResultType getBool(int64_t index, int64_t& offset, bool& v) const noexcept; - ResultType getFloat(int64_t index, int64_t& offset, float& v) const noexcept; - ResultType getDouble(int64_t index, int64_t& offset, double& v) - const noexcept; - ResultType getString(int64_t index, int64_t& offset, folly::StringPiece& v) - const noexcept; - ResultType getInt64(int64_t index, int64_t& offset, int64_t& v) const noexcept; - ResultType getVid(int64_t index, int64_t& offset, int64_t& v) const noexcept; -}; - -} // namespace nebula - - -#define RR_CELL_GET_VALUE(FN) \ - if (!*iter_) { \ - /* Already reached the end, or an error happened */ \ - return ResultType::E_INDEX_OUT_OF_RANGE; \ - } \ - int64_t offset = iter_->offset_; \ - ResultType res = reader_->get ## FN(iter_->index_, offset, v); \ - if (res == ResultType::SUCCEEDED) { \ - iter_->bytes_ = offset - iter_->offset_; \ - } else { \ - /* Move iterator to the end */ \ - iter_->index_ = iter_->numFields_; \ - } \ - return res - -#define RR_GET_VALUE_BY_NAME(FN, VT) \ - RowReader::get ## FN(const folly::StringPiece name, \ - VT& v) const noexcept { \ - int64_t index = schema_->getFieldIndex(name); \ - if (index < 0) { \ - return ResultType::E_NAME_NOT_FOUND; \ - } else { \ - return get ## FN (index, v); \ - } \ - } - -#define RR_GET_OFFSET() \ - int64_t offset = skipToField(index); \ - if (offset < 0) { \ - return static_cast(offset); \ - } \ - if (index >= static_cast(schema_->getNumFields())) { \ - return ResultType::E_INDEX_OUT_OF_RANGE; \ - } - - -#include "dataman/RowReader.inl" - -#endif // DATAMAN_ROWREADER_H_ diff --git a/src/dataman/RowReader.inl b/src/dataman/RowReader.inl deleted file mode 100644 index 60ffd410f..000000000 --- a/src/dataman/RowReader.inl +++ /dev/null @@ -1,72 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -namespace nebula { - -/********************************************* - * - * class RowReader::Cell - * - ********************************************/ -template -typename std::enable_if::value, ResultType>::type -RowReader::Cell::getInt(T& v) const noexcept { - RR_CELL_GET_VALUE(Int); -} - - -/********************************************* - * - * class RowReader - * - ********************************************/ -template -typename std::enable_if::value, ResultType>::type -RowReader::getInt(int64_t index, int64_t& offset, T& v) const noexcept { - switch (schema_->getFieldType(index).get_type()) { - case cpp2::SupportedType::INT: - case cpp2::SupportedType::TIMESTAMP: { - int32_t numBytes = readInteger(offset, v); - if (numBytes < 0) { - return static_cast(numBytes); - } - offset += numBytes; - break; - } - default: { - return ResultType::E_INCOMPATIBLE_TYPE; - } - } - - return ResultType::SUCCEEDED; -} - - -template -typename std::enable_if::value, ResultType>::type -RR_GET_VALUE_BY_NAME(Int, T) - -template -typename std::enable_if::value, ResultType>::type -RowReader::getInt(int64_t index, T& v) const noexcept { - RR_GET_OFFSET() - return getInt(index, offset, v); -} - - -template -typename std::enable_if::value, int32_t>::type -RowReader::readInteger(int64_t offset, T& v) const noexcept { - const uint8_t* start = reinterpret_cast(&(data_[offset])); - folly::ByteRange range(start, data_.size() - offset); - - // TODO We might want to catch the exception here - v = folly::decodeVarint(range); - return range.begin() - start; -} - -} // namespace nebula - diff --git a/src/dataman/RowSetReader.cpp b/src/dataman/RowSetReader.cpp deleted file mode 100644 index 24219a5a3..000000000 --- a/src/dataman/RowSetReader.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#include "base/Base.h" -#include "dataman/RowSetReader.h" -#include "dataman/ResultSchemaProvider.h" -#include "dataman/RowReader.h" - -namespace nebula { - -using nebula::meta::SchemaProviderIf; - -/*********************************** - * - * RowSetReader::Iterator class - * - **********************************/ -RowSetReader::Iterator::Iterator( - std::shared_ptr schema, - const folly::StringPiece& data, - int64_t offset) - : schema_(schema) - , data_(data) - , offset_(offset) { - len_ = prepareReader(); -} - - -int32_t RowSetReader::Iterator::prepareReader() { - if (offset_ < static_cast(data_.size())) { - try { - auto begin = reinterpret_cast(data_.begin()); - folly::ByteRange range(begin + offset_, 10); - int32_t rowLen = folly::decodeVarint(range); - int32_t lenBytes = range.begin() - begin - offset_; - reader_ = RowReader::getRowReader( - data_.subpiece(offset_ + lenBytes, rowLen), - schema_); - return lenBytes + rowLen; - } catch (const std::exception& ex) { - LOG(ERROR) << "Failed to read the row length"; - offset_ = data_.size(); - reader_.reset(); - } - } - - return 0; -} - - -const RowReader& RowSetReader::Iterator::operator*() const noexcept { - return *reader_; -} - - -const RowReader* RowSetReader::Iterator::operator->() const noexcept { - return reader_.get(); -} - - -RowSetReader::Iterator& RowSetReader::Iterator::operator++() noexcept { - offset_ += len_; - len_ = prepareReader(); - return *this; -} - - -RowSetReader::Iterator::operator bool() const noexcept { - return offset_ < static_cast(data_.size()); -} - - -bool RowSetReader::Iterator::operator==(const Iterator& rhs) { - return schema_ == rhs.schema_ && - data_ == rhs.data_ && - offset_ == rhs.offset_; -} - - -/*********************************** - * - * RowSetReader class - * - **********************************/ - -RowSetReader::RowSetReader(std::shared_ptr schema, - folly::StringPiece data) - : schema_{schema} - , data_{data} { -} - - -RowSetReader::Iterator RowSetReader::begin() const noexcept { - return Iterator(schema_, data_, 0); -} - - -RowSetReader::Iterator RowSetReader::end() const noexcept { - return Iterator(schema_, data_, data_.size()); -} - -} // namespace nebula - diff --git a/src/dataman/RowSetReader.h b/src/dataman/RowSetReader.h deleted file mode 100644 index b167cacf3..000000000 --- a/src/dataman/RowSetReader.h +++ /dev/null @@ -1,77 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#ifndef DATAMAN_ROWSETREADER_H_ -#define DATAMAN_ROWSETREADER_H_ - -#include "base/Base.h" -#include "gen-cpp2/storage_types.h" -#include "meta/SchemaProviderIf.h" - -namespace nebula { - -class RowReader; - -class RowSetReader { -public: - class Iterator final { - friend class RowSetReader; - public: - const RowReader& operator*() const noexcept; - const RowReader* operator->() const noexcept; - - Iterator& operator++() noexcept; - - operator bool() const noexcept; - bool operator==(const Iterator& rhs); - - private: - std::shared_ptr schema_; - // The total length of the encoded row set - const folly::StringPiece& data_; - - std::unique_ptr reader_; - // The offset of the current row - int64_t offset_; - // The length of the current row - int64_t len_; - - Iterator(std::shared_ptr schema, - const folly::StringPiece& data, - int64_t offset); - - // Prepare the RowReader object - // When succeeded, the method returns the total length of the row - // data (including the row length and row data) - int32_t prepareReader(); - }; - -public: - // Constructor to process the property value - // - // In this case, the RowSetReader will *NOT* take the ownership of - // the schema and the record - RowSetReader(std::shared_ptr schema, - folly::StringPiece record); - - virtual ~RowSetReader() = default; - - std::shared_ptr schema() const { - return schema_; - } - - Iterator begin() const noexcept; - Iterator end() const noexcept; - -private: - std::shared_ptr schema_; - std::string dataStore_; - folly::StringPiece data_; -}; - -} // namespace nebula -#endif // DATAMAN_ROWSETREADER_H_ - diff --git a/src/dataman/RowSetWriter.cpp b/src/dataman/RowSetWriter.cpp deleted file mode 100644 index a2a63bd31..000000000 --- a/src/dataman/RowSetWriter.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#include "base/Base.h" -#include "dataman/RowSetWriter.h" - -namespace nebula { - -using nebula::meta::SchemaProviderIf; - -RowSetWriter::RowSetWriter(std::shared_ptr schema, - int64_t reservedSize) - : schema_(std::move(schema)) { - data_.reserve(reservedSize); -} - - -void RowSetWriter::writeRowLength(int64_t len) { - VLOG(3) << "Write row length " << len; - uint8_t buf[10]; - size_t lenBytes = folly::encodeVarint(len, buf); - DCHECK_GT(lenBytes, 0UL); - data_.append(reinterpret_cast(buf), lenBytes); -} - - -void RowSetWriter::addRow(RowWriter& writer) { - writeRowLength(writer.size()); - writer.encodeTo(data_); -} - - -void RowSetWriter::addRow(const std::string& data) { - writeRowLength(data.size()); - data_.append(data); -} - -void RowSetWriter::addAll(const std::string& data) { - data_.append(data); -} -} // namespace nebula - diff --git a/src/dataman/RowSetWriter.h b/src/dataman/RowSetWriter.h deleted file mode 100644 index 5d9872852..000000000 --- a/src/dataman/RowSetWriter.h +++ /dev/null @@ -1,56 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#ifndef DATAMAN_ROWSETWRITER_H_ -#define DATAMAN_ROWSETWRITER_H_ - -#include "base/Base.h" -#include "gen-cpp2/graph_types.h" -#include "meta/SchemaProviderIf.h" -#include "dataman/RowWriter.h" - -namespace nebula { - -class RowSetWriter { -public: - // The reservedSize is the potential data size, It hints the writer - // to reserve the space so that the expensive resize() will not happen - explicit RowSetWriter( - std::shared_ptr schema - = std::shared_ptr(), - int64_t reservedSize = 4096); - - void setSchema(std::shared_ptr schema) { - schema_ = std::move(schema); - } - - std::shared_ptr schema() const { - return schema_; - } - - // Return the reference of the rowset data, so that the caller can - // move the data - std::string& data() { - return data_; - } - - // Both schemas have to be same - void addRow(RowWriter& writer); - // Append the encoded row data - void addRow(const std::string& data); - // Copy existed rows - void addAll(const std::string& data); - -private: - std::shared_ptr schema_; - std::string data_; - - void writeRowLength(int64_t len); -}; - -} // namespace nebula -#endif // DATAMAN_ROWSETWRITER_H_ - diff --git a/src/dataman/RowUpdater.cpp b/src/dataman/RowUpdater.cpp deleted file mode 100644 index bf25c80ea..000000000 --- a/src/dataman/RowUpdater.cpp +++ /dev/null @@ -1,283 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#include "base/Base.h" -#include "dataman/RowUpdater.h" -#include "dataman/RowWriter.h" - -namespace nebula { - -using folly::hash::SpookyHashV2; -using nebula::meta::SchemaProviderIf; - -RowUpdater::RowUpdater(std::unique_ptr reader, - std::shared_ptr schema) - : schema_(std::move(schema)) - , reader_(std::move(reader)) { - CHECK(!!schema_); -} - - -RowUpdater::RowUpdater(std::shared_ptr schema) - : schema_(std::move(schema)) - , reader_(nullptr) { - CHECK(!!schema_); -} - - -std::string RowUpdater::encode() const noexcept { - std::string encoded; - // TODO Reserve enough space so resize will not happen - encodeTo(encoded); - - return encoded; -} - - -void RowUpdater::encodeTo(std::string& encoded) const noexcept { - RowWriter writer(schema_); - auto it = schema_->begin(); - while (static_cast(it)) { - switch (it->getType().get_type()) { - case cpp2::SupportedType::BOOL: { - RU_OUTPUT_VALUE(bool, Bool, false); - break; - } - case cpp2::SupportedType::INT: - case cpp2::SupportedType::TIMESTAMP: { - RU_OUTPUT_VALUE(int64_t, Int, 0); - break; - } - case cpp2::SupportedType::FLOAT: { - RU_OUTPUT_VALUE(float, Float, (float)0.0); - break; - } - case cpp2::SupportedType::DOUBLE: { - RU_OUTPUT_VALUE(double, Double, (double)0.0); - break; - } - case cpp2::SupportedType::STRING: { - RU_OUTPUT_VALUE(folly::StringPiece, String, ""); - break; - } - case cpp2::SupportedType::VID: { - RU_OUTPUT_VALUE(int64_t, Vid, 0); - break; - } - default: { - LOG(FATAL) << "Unimplemented"; - } - } - ++it; - } - - return writer.encodeTo(encoded); -} - - -/*************************************************** - * - * Field Updaters - * - **************************************************/ -ResultType RowUpdater::setBool(const folly::StringPiece name, - bool v) noexcept { - RU_GET_TYPE_BY_NAME() - - uint64_t hash; - switch (type.get_type()) { - case cpp2::SupportedType::BOOL: - hash = SpookyHashV2::Hash64(name.begin(), name.size(), 0); - updatedFields_[hash] = v; - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::setFloat(const folly::StringPiece name, - float v) noexcept { - RU_GET_TYPE_BY_NAME() - - uint64_t hash; - switch (type.get_type()) { - case cpp2::SupportedType::FLOAT: - hash = SpookyHashV2::Hash64(name.begin(), name.size(), 0); - updatedFields_[hash] = v; - break; - case cpp2::SupportedType::DOUBLE: - hash = SpookyHashV2::Hash64(name.begin(), name.size(), 0); - updatedFields_[hash] = static_cast(v); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::setDouble(const folly::StringPiece name, - double v) noexcept { - RU_GET_TYPE_BY_NAME() - - uint64_t hash; - switch (type.get_type()) { - case cpp2::SupportedType::FLOAT: - hash = SpookyHashV2::Hash64(name.begin(), name.size(), 0); - updatedFields_[hash] = static_cast(v); - break; - case cpp2::SupportedType::DOUBLE: - hash = SpookyHashV2::Hash64(name.begin(), name.size(), 0); - updatedFields_[hash] = v; - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::setString(const folly::StringPiece name, - folly::StringPiece v) noexcept { - RU_GET_TYPE_BY_NAME() - - uint64_t hash; - switch (type.get_type()) { - case cpp2::SupportedType::STRING: - hash = SpookyHashV2::Hash64(name.begin(), name.size(), 0); - updatedFields_[hash] = v.toString(); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::setVid(const folly::StringPiece name, - int64_t v) noexcept { - RU_GET_TYPE_BY_NAME() - - uint64_t hash; - switch (type.get_type()) { - case cpp2::SupportedType::VID: - hash = SpookyHashV2::Hash64(name.begin(), name.size(), 0); - updatedFields_[hash] = v; - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -/*************************************************** - * - * Field Accessors - * - **************************************************/ -ResultType RowUpdater::getBool(const folly::StringPiece name, - bool& v) const noexcept { - RU_CHECK_UPDATED_FIELDS(Bool) - - switch (it->second.which()) { - case VALUE_TYPE_BOOL: - v = boost::get(it->second); - break; - case VALUE_TYPE_INT: - v = intToBool(boost::get(it->second)); - break; - case VALUE_TYPE_STRING: - v = strToBool(boost::get(it->second)); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::getFloat(const folly::StringPiece name, - float& v) const noexcept { - RU_CHECK_UPDATED_FIELDS(Float) - - switch (it->second.which()) { - case VALUE_TYPE_FLOAT: - v = boost::get(it->second); - break; - case VALUE_TYPE_DOUBLE: - v = boost::get(it->second); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::getDouble(const folly::StringPiece name, - double& v) const noexcept { - RU_CHECK_UPDATED_FIELDS(Double) - - switch (it->second.which()) { - case VALUE_TYPE_FLOAT: - v = boost::get(it->second); - break; - case VALUE_TYPE_DOUBLE: - v = boost::get(it->second); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::getString(const folly::StringPiece name, - folly::StringPiece& v) const noexcept { - RU_CHECK_UPDATED_FIELDS(String) - - switch (it->second.which()) { - case VALUE_TYPE_STRING: - v = boost::get(it->second); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -ResultType RowUpdater::getVid(const folly::StringPiece name, - int64_t& v) const noexcept { - RU_CHECK_UPDATED_FIELDS(Vid) - - switch (it->second.which()) { - case VALUE_TYPE_INT: - v = boost::get(it->second); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -#undef CHECK_UPDATED_FIELDS - -} // namespace nebula diff --git a/src/dataman/RowUpdater.h b/src/dataman/RowUpdater.h deleted file mode 100644 index 555c06bf8..000000000 --- a/src/dataman/RowUpdater.h +++ /dev/null @@ -1,124 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#ifndef DATAMAN_ROWUPDATER_H_ -#define DATAMAN_ROWUPDATER_H_ - - -#include "base/Base.h" -#include "gen-cpp2/graph_types.h" -#include "dataman/DataCommon.h" -#include "meta/SchemaProviderIf.h" -#include "dataman/RowReader.h" - -namespace nebula { - -/** - * This class is mainly used to update an existing row. The schema has - * to be provided. The existing row is optional. If not provided, a default - * value will be used for any un-assigned field - */ -class RowUpdater { -public: - // reader holds the original data - // schema is the writer schema, which means the updated data will be encoded - // using this schema - RowUpdater(std::unique_ptr reader, - std::shared_ptr schema); - explicit RowUpdater(std::shared_ptr schema); - - // Encode into a binary array - std::string encode() const noexcept; - // Encode and append to the given string - // For the sake of performance, the caller needs to make sure the string - // is large enough, so that resize will not happen - void encodeTo(std::string& encoded) const noexcept; - - /** - * Accessors - */ - ResultType setBool(const folly::StringPiece name, bool v) noexcept; - ResultType getBool(const folly::StringPiece name, bool& v) const noexcept; - - template - typename std::enable_if::value, ResultType>::type - setInt(const folly::StringPiece name, T v) noexcept; - - template - typename std::enable_if::value, ResultType>::type - getInt(const folly::StringPiece name, T& v) const noexcept; - - ResultType setFloat(const folly::StringPiece name, float v) noexcept; - ResultType getFloat(const folly::StringPiece name, float& v) const noexcept; - - ResultType setDouble(const folly::StringPiece name, double v) noexcept; - ResultType getDouble(const folly::StringPiece name, double& v) const noexcept; - - ResultType setString(const folly::StringPiece name, - folly::StringPiece v) noexcept; - ResultType getString(const folly::StringPiece name, - folly::StringPiece& v) const noexcept; - - ResultType setVid(const folly::StringPiece name, int64_t v) noexcept; - ResultType getVid(const folly::StringPiece name, int64_t& v) const noexcept; - - - // TODO getPath(const std::string& name) const noexcept; - // TODO getList(const std::string& name) const noexcept; - // TODO getSet(const std::string& name) const noexcept; - // TODO getMap(const std::string& name) const noexcept; - -private: - std::shared_ptr schema_; - std::unique_ptr reader_; - // Hash64(field_name) => value - std::unordered_map updatedFields_; -}; - -} // namespace nebula - - -#define RU_GET_TYPE_BY_NAME() \ - const cpp2::ValueType& type \ - = schema_->getFieldType(name); \ - if (type == CommonConstants::kInvalidValueType()) { \ - return ResultType::E_NAME_NOT_FOUND; \ - } - - -#define RU_CHECK_UPDATED_FIELDS(FN) \ - uint64_t hash = folly::hash::SpookyHashV2::Hash64(name.begin(), \ - name.size(), \ - 0); \ - auto it = updatedFields_.find(hash); \ - if (it == updatedFields_.end()) { \ - if (!reader_) { \ - return ResultType::E_NAME_NOT_FOUND; \ - } else { \ - return reader_->get ## FN (name, v); \ - } \ - } - -#define RU_OUTPUT_VALUE(VT, FN, DV) \ - VT val; \ - auto res = get ## FN(it->getName(), val); \ - if (res != ResultType::SUCCEEDED) { \ - if (res == ResultType::E_NAME_NOT_FOUND) { \ - /* Use a default value */ \ - writer << DV; \ - } else { \ - LOG(ERROR) << "Failed to encode updated data"; \ - return; \ - } \ - } else { \ - writer << val; \ - } - -#include "dataman/RowUpdater.inl" - -#endif // DATAMAN_ROWUPDATER_H_ - - diff --git a/src/dataman/RowUpdater.inl b/src/dataman/RowUpdater.inl deleted file mode 100644 index 3dff4ee98..000000000 --- a/src/dataman/RowUpdater.inl +++ /dev/null @@ -1,48 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -namespace nebula { - -template -typename std::enable_if::value, ResultType>::type -RowUpdater::setInt(const folly::StringPiece name, T v) noexcept { - RU_GET_TYPE_BY_NAME() - - uint64_t hash; - switch (type.get_type()) { - case cpp2::SupportedType::INT: - case cpp2::SupportedType::TIMESTAMP: - hash = folly::hash::SpookyHashV2::Hash64(name.begin(), - name.size(), - 0); - updatedFields_[hash] = (int64_t)v; - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - - -template -typename std::enable_if::value, ResultType>::type -RowUpdater::getInt(const folly::StringPiece name, T& v) const noexcept { - RU_CHECK_UPDATED_FIELDS(Int) - - switch (it->second.which()) { - case VALUE_TYPE_INT: - v = boost::get(it->second); - break; - default: - return ResultType::E_INCOMPATIBLE_TYPE; - } - - return ResultType::SUCCEEDED; -} - -} // namespace nebula - diff --git a/src/dataman/test/CMakeLists.txt b/src/dataman/test/CMakeLists.txt deleted file mode 100644 index 1e6470161..000000000 --- a/src/dataman/test/CMakeLists.txt +++ /dev/null @@ -1,64 +0,0 @@ -set(DATAMAN_TEST_LIBS - $ - $ - $ - $ - $ - $ - $ -) - - -nebula_add_test( - NAME row_reader_test - SOURCES RowReaderTest.cpp - OBJECTS ${DATAMAN_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle gtest -) - - -nebula_add_test( - NAME row_writer_test - SOURCES RowWriterTest.cpp - OBJECTS ${DATAMAN_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle gtest -) - - -nebula_add_test( - NAME row_updater_test - SOURCES RowUpdaterTest.cpp - OBJECTS ${DATAMAN_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle gtest -) - - -nebula_add_test( - NAME rowset_reader_writer_test - SOURCES RowSetReaderWriterTest.cpp - OBJECTS ${DATAMAN_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle gtest -) - - -nebula_add_executable( - NAME row_writer_bm - SOURCES RowWriterBenchmark.cpp - OBJECTS ${DATAMAN_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} follybenchmark wangle boost_regex -) - - -nebula_add_executable( - NAME row_reader_bm - SOURCES RowReaderBenchmark.cpp - OBJECTS ${DATAMAN_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} follybenchmark wangle boost_regex -) - -nebula_add_test( - NAME nebula_codec_test - SOURCES NebulaCodecTest.cpp - OBJECTS ${DATAMAN_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle gtest -)