Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-line parsing supports SIMD optimization #1872

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT stat
option(WITHOUTGDB "Build Logtail without gdb")
option(WITHSPL "Build Logtail and UT with SPL" ON)
option(BUILD_LOGTAIL_UT "Build unit test for Logtail")
option(ENABLE_SIMD "Enable vectorization support" ON)
set(PROVIDER_PATH "provider" CACHE PATH "Path to the provider module") # external provider path can be set with -DPROVIDER_PATH
set(UNITTEST_PATH "unittest" CACHE PATH "Path to the unittest module") # external unittest path can be set with -DUNITTEST_PATH

Expand All @@ -53,6 +54,16 @@ if (NOT WITHSPL)
add_definitions(-D__EXCLUDE_SPL__)
endif()

if (ENABLE_SIMD)
include(CheckCXXCompilerFlag)
check_cxx_compiler_flag("-mavx2" COMPILER_SUPPORTS_AVX2)

if (COMPILER_SUPPORTS_AVX2)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=haswell")
add_definitions(-DUSE_SIMD_AVX2)
endif ()
endif ()

# Default C/CXX flags.
if (UNIX)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fpic -fPIC -D_LARGEFILE64_SOURCE")
Expand Down Expand Up @@ -251,4 +262,4 @@ if (BUILD_LOGTAIL_UT)
include(CTest)
enable_testing()
add_subdirectory("${UNITTEST_PATH}" "${CMAKE_BINARY_DIR}/unittest")
endif ()
endif ()
24 changes: 24 additions & 0 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <limits>
#include <numeric>
#include <random>
#ifdef USE_SIMD_AVX2
#include <immintrin.h>
#endif

#include "app_config/AppConfig.h"
#include "checkpoint/CheckPointManager.h"
Expand Down Expand Up @@ -2197,7 +2200,27 @@ LineInfo RawTextParser::GetLastLine(StringView buffer,
if (protocolFunctionIndex != 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
}
#ifdef USE_SIMD_AVX2
const char * data = buffer.data();
const int vecSize = 32;
__m256i newlineVec = _mm256_set1_epi8('\n');

for (int32_t pos = end - vecSize; pos >= 0; pos -= vecSize) {
__m256i chunk = _mm256_loadu_si256(reinterpret_cast<const __m256i *>(data + pos));
__m256i cmp = _mm256_cmpeq_epi8(chunk, newlineVec);
int mask = _mm256_movemask_epi8(cmp);

if (mask != 0) {
int offset = __builtin_ctz(mask);
int32_t begin = pos + offset + 1;
return {.data = StringView(data + begin, end - begin),
.lineBegin = begin,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
}
}
#else
for (int32_t begin = end; begin > 0; --begin) {
if (begin == 0 || buffer[begin - 1] == '\n') {
return {.data = StringView(buffer.data() + begin, end - begin),
Expand All @@ -2207,6 +2230,7 @@ LineInfo RawTextParser::GetLastLine(StringView buffer,
.fullLine = true};
}
}
#endif
return {.data = StringView(buffer.data(), end),
.lineBegin = 0,
.lineEnd = end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <boost/regex.hpp>
#include <string>
#ifdef USE_SIMD_AVX2
#include <immintrin.h>
#endif

#include "app_config/AppConfig.h"
#include "constants/Constants.h"
Expand Down Expand Up @@ -364,11 +367,36 @@ StringView ProcessorSplitMultilineLogStringNative::GetNextLine(StringView log, s
return StringView();
}

#ifdef USE_SIMD_AVX2
const char * data = log.data();
const int vecSize = 32;
__m256i newlineVec = _mm256_set1_epi8('\n');

size_t pos = begin;
for (; pos + vecSize <= log.size(); pos += vecSize) {
__m256i chunk = _mm256_loadu_si256(reinterpret_cast<const __m256i *>(data + pos));
__m256i cmp = _mm256_cmpeq_epi8(chunk, newlineVec);
int mask = _mm256_movemask_epi8(cmp);

if (mask != 0) {
int offset = __builtin_ctz(mask);
return StringView(data + begin, pos + offset - begin);
}
}

// 处理不足32字节
for (; pos < log.size(); ++pos) {
if (log[pos] == '\n') {
return StringView(data + begin, pos - begin);
}
}
#else
for (size_t end = begin; end < log.size(); ++end) {
if (log[end] == '\n') {
return StringView(log.data() + begin, end - begin);
}
}
#endif
return StringView(log.data() + begin, log.size() - begin);
}

Expand Down
Loading