Skip to content

Commit

Permalink
feat(clp-s): Add support for ingesting logs from S3. (#639)
Browse files Browse the repository at this point in the history
Co-authored-by: wraymo <[email protected]>
  • Loading branch information
gibber9809 and wraymo authored Jan 8, 2025
1 parent 818be9e commit 5d3b671
Show file tree
Hide file tree
Showing 24 changed files with 717 additions and 217 deletions.
4 changes: 3 additions & 1 deletion components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ set(SOURCE_FILES_clp_s_unitTest
src/clp_s/FileReader.hpp
src/clp_s/FileWriter.cpp
src/clp_s/FileWriter.hpp
src/clp_s/InputConfig.cpp
src/clp_s/InputConfig.hpp
src/clp_s/JsonConstructor.cpp
src/clp_s/JsonConstructor.hpp
src/clp_s/JsonFileIterator.cpp
Expand Down Expand Up @@ -613,7 +615,7 @@ target_include_directories(unitTest
target_link_libraries(unitTest
PRIVATE
absl::flat_hash_map
Boost::filesystem Boost::iostreams Boost::program_options Boost::regex
Boost::filesystem Boost::iostreams Boost::program_options Boost::regex Boost::url
${CURL_LIBRARIES}
fmt::fmt
kql
Expand Down
23 changes: 17 additions & 6 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@
#include <string_view>

#include "archive_constants.hpp"
#include "InputConfig.hpp"
#include "ReaderUtils.hpp"

using std::string_view;

namespace clp_s {
void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& network_auth) {
if (m_is_open) {
throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__);
}
m_is_open = true;
m_archive_id = archive_id;
std::filesystem::path archive_path{archives_dir};
archive_path /= m_archive_id;
auto const archive_path_str = archive_path.string();

if (false == get_archive_id_from_path(archive_path, m_archive_id)) {
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}

if (InputSource::Filesystem != archive_path.source) {
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}

if (false == std::filesystem::is_directory(archive_path.path)) {
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}
auto const archive_path_str = archive_path.path;

m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str);
Expand Down Expand Up @@ -198,8 +208,9 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
column_reader = new DateStringColumnReader(column_id, m_timestamp_dict);
break;
// No need to push columns without associated object readers into the SchemaReader.
case NodeType::Object:
case NodeType::Metadata:
case NodeType::NullValue:
case NodeType::Object:
case NodeType::StructuredArray:
case NodeType::Unknown:
break;
Expand Down
9 changes: 4 additions & 5 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
#include <string_view>
#include <utility>

#include <boost/filesystem.hpp>

#include "DictionaryReader.hpp"
#include "InputConfig.hpp"
#include "PackedStreamReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
Expand All @@ -32,10 +31,10 @@ class ArchiveReader {

/**
* Opens an archive for reading.
* @param archives_dir
* @param archive_id
* @param archive_path
* @param network_auth
*/
void open(std::string_view archives_dir, std::string_view archive_id);
void open(Path const& archive_path, NetworkAuthOption const& network_auth);

/**
* Reads the dictionaries and metadata.
Expand Down
5 changes: 3 additions & 2 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
case NodeType::DateString:
writer->append_column(new DateStringColumnWriter(id));
break;
case NodeType::StructuredArray:
case NodeType::Object:
case NodeType::Metadata:
case NodeType::NullValue:
case NodeType::Object:
case NodeType::StructuredArray:
case NodeType::Unknown:
break;
}
Expand Down
1 change: 0 additions & 1 deletion components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <string_view>
#include <utility>

#include <boost/filesystem.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>

Expand Down
27 changes: 26 additions & 1 deletion components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ add_subdirectory(search/kql)

set(
CLP_SOURCES
../clp/aws/AwsAuthenticationSigner.cpp
../clp/aws/AwsAuthenticationSigner.hpp
../clp/BoundedReader.cpp
../clp/BoundedReader.hpp
../clp/CurlDownloadHandler.cpp
../clp/CurlDownloadHandler.hpp
../clp/CurlEasyHandle.hpp
../clp/CurlGlobalInstance.cpp
../clp/CurlGlobalInstance.hpp
../clp/CurlOperationFailed.hpp
../clp/CurlStringList.hpp
../clp/cli_utils.cpp
../clp/cli_utils.hpp
../clp/database_utils.cpp
Expand All @@ -28,11 +39,15 @@ set(
../clp/ffi/Value.hpp
../clp/FileDescriptor.cpp
../clp/FileDescriptor.hpp
../clp/FileReader.cpp
../clp/FileReader.hpp
../clp/GlobalMetadataDB.hpp
../clp/GlobalMetadataDBConfig.cpp
../clp/GlobalMetadataDBConfig.hpp
../clp/GlobalMySQLMetadataDB.cpp
../clp/GlobalMySQLMetadataDB.hpp
../clp/hash_utils.cpp
../clp/hash_utils.hpp
../clp/ir/EncodedTextAst.cpp
../clp/ir/EncodedTextAst.hpp
../clp/ir/parsing.cpp
Expand All @@ -43,18 +58,24 @@ set(
../clp/MySQLParamBindings.hpp
../clp/MySQLPreparedStatement.cpp
../clp/MySQLPreparedStatement.hpp
../clp/NetworkReader.cpp
../clp/NetworkReader.hpp
../clp/networking/socket_utils.cpp
../clp/networking/socket_utils.hpp
../clp/ReaderInterface.cpp
../clp/ReaderInterface.hpp
../clp/ReadOnlyMemoryMappedFile.cpp
../clp/ReadOnlyMemoryMappedFile.hpp
../clp/spdlog_with_specializations.hpp
../clp/streaming_archive/ArchiveMetadata.cpp
../clp/streaming_archive/ArchiveMetadata.hpp
../clp/streaming_compression/zstd/Decompressor.cpp
../clp/streaming_compression/zstd/Decompressor.hpp
../clp/Thread.cpp
../clp/Thread.hpp
../clp/TraceableException.hpp
../clp/time_types.hpp
../clp/type_utils.hpp
../clp/utf8_utils.cpp
../clp/utf8_utils.hpp
../clp/WriterInterface.cpp
Expand Down Expand Up @@ -89,6 +110,8 @@ set(
FileReader.hpp
FileWriter.cpp
FileWriter.hpp
InputConfig.cpp
InputConfig.hpp
JsonConstructor.cpp
JsonConstructor.hpp
JsonFileIterator.cpp
Expand Down Expand Up @@ -226,12 +249,14 @@ target_link_libraries(
clp-s
PRIVATE
absl::flat_hash_map
Boost::filesystem Boost::iostreams Boost::program_options
Boost::iostreams Boost::program_options Boost::regex Boost::url
${CURL_LIBRARIES}
clp::string_utils
kql
MariaDBClient::MariaDBClient
${MONGOCXX_TARGET}
msgpack-cxx
OpenSSL::Crypto
simdjson
spdlog::spdlog
yaml-cpp::yaml-cpp
Expand Down
Loading

0 comments on commit 5d3b671

Please sign in to comment.