From 5d3b67145876d90defa45aeab0a37e9ad48aaddf Mon Sep 17 00:00:00 2001 From: Devin Gibson Date: Wed, 8 Jan 2025 15:50:36 -0500 Subject: [PATCH] feat(clp-s): Add support for ingesting logs from S3. (#639) Co-authored-by: wraymo <37269683+wraymo@users.noreply.github.com> --- components/core/CMakeLists.txt | 4 +- components/core/src/clp_s/ArchiveReader.cpp | 23 ++- components/core/src/clp_s/ArchiveReader.hpp | 9 +- components/core/src/clp_s/ArchiveWriter.cpp | 5 +- components/core/src/clp_s/ArchiveWriter.hpp | 1 - components/core/src/clp_s/CMakeLists.txt | 27 +++- .../core/src/clp_s/CommandLineArguments.cpp | 148 ++++++++++++++---- .../core/src/clp_s/CommandLineArguments.hpp | 13 +- components/core/src/clp_s/InputConfig.cpp | 88 +++++++++++ components/core/src/clp_s/InputConfig.hpp | 105 +++++++++++++ components/core/src/clp_s/JsonConstructor.cpp | 17 +- components/core/src/clp_s/JsonConstructor.hpp | 7 +- .../core/src/clp_s/JsonFileIterator.cpp | 19 +-- .../core/src/clp_s/JsonFileIterator.hpp | 16 +- components/core/src/clp_s/JsonParser.cpp | 61 +++++--- components/core/src/clp_s/JsonParser.hpp | 8 +- components/core/src/clp_s/ReaderUtils.cpp | 83 ++++++++-- components/core/src/clp_s/ReaderUtils.hpp | 14 +- components/core/src/clp_s/Utils.cpp | 134 ++++++++++++++-- components/core/src/clp_s/Utils.hpp | 50 +++++- .../core/src/clp_s/ZstdDecompressor.cpp | 12 +- components/core/src/clp_s/clp-s.cpp | 75 +++------ .../core/src/clp_s/search/kql/CMakeLists.txt | 1 + .../core/tests/test-clp_s-end_to_end.cpp | 14 +- 24 files changed, 717 insertions(+), 217 deletions(-) create mode 100644 components/core/src/clp_s/InputConfig.cpp create mode 100644 components/core/src/clp_s/InputConfig.hpp diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 0995a0afb..f07a7db19 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -275,6 +275,8 @@ set(SOURCE_FILES_clp_s_unitTest src/clp_s/FileReader.hpp src/clp_s/FileWriter.cpp src/clp_s/FileWriter.hpp + src/clp_s/InputConfig.cpp + src/clp_s/InputConfig.hpp src/clp_s/JsonConstructor.cpp src/clp_s/JsonConstructor.hpp src/clp_s/JsonFileIterator.cpp @@ -613,7 +615,7 @@ target_include_directories(unitTest target_link_libraries(unitTest PRIVATE absl::flat_hash_map - Boost::filesystem Boost::iostreams Boost::program_options Boost::regex + Boost::filesystem Boost::iostreams Boost::program_options Boost::regex Boost::url ${CURL_LIBRARIES} fmt::fmt kql diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 7c68b301d..738e8e645 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -4,20 +4,30 @@ #include #include "archive_constants.hpp" +#include "InputConfig.hpp" #include "ReaderUtils.hpp" using std::string_view; namespace clp_s { -void ArchiveReader::open(string_view archives_dir, string_view archive_id) { +void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& network_auth) { if (m_is_open) { throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); } m_is_open = true; - m_archive_id = archive_id; - std::filesystem::path archive_path{archives_dir}; - archive_path /= m_archive_id; - auto const archive_path_str = archive_path.string(); + + if (false == get_archive_id_from_path(archive_path, m_archive_id)) { + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } + + if (InputSource::Filesystem != archive_path.source) { + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } + + if (false == std::filesystem::is_directory(archive_path.path)) { + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } + auto const archive_path_str = archive_path.path; m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str); m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str); @@ -198,8 +208,9 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3 column_reader = new DateStringColumnReader(column_id, m_timestamp_dict); break; // No need to push columns without associated object readers into the SchemaReader. - case NodeType::Object: + case NodeType::Metadata: case NodeType::NullValue: + case NodeType::Object: case NodeType::StructuredArray: case NodeType::Unknown: break; diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 6b437dfd2..9e492720b 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -7,9 +7,8 @@ #include #include -#include - #include "DictionaryReader.hpp" +#include "InputConfig.hpp" #include "PackedStreamReader.hpp" #include "ReaderUtils.hpp" #include "SchemaReader.hpp" @@ -32,10 +31,10 @@ class ArchiveReader { /** * Opens an archive for reading. - * @param archives_dir - * @param archive_id + * @param archive_path + * @param network_auth */ - void open(std::string_view archives_dir, std::string_view archive_id); + void open(Path const& archive_path, NetworkAuthOption const& network_auth); /** * Reads the dictionaries and metadata. diff --git a/components/core/src/clp_s/ArchiveWriter.cpp b/components/core/src/clp_s/ArchiveWriter.cpp index d627479de..2a60013a9 100644 --- a/components/core/src/clp_s/ArchiveWriter.cpp +++ b/components/core/src/clp_s/ArchiveWriter.cpp @@ -270,9 +270,10 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& case NodeType::DateString: writer->append_column(new DateStringColumnWriter(id)); break; - case NodeType::StructuredArray: - case NodeType::Object: + case NodeType::Metadata: case NodeType::NullValue: + case NodeType::Object: + case NodeType::StructuredArray: case NodeType::Unknown: break; } diff --git a/components/core/src/clp_s/ArchiveWriter.hpp b/components/core/src/clp_s/ArchiveWriter.hpp index 82a0122bc..a76d15daf 100644 --- a/components/core/src/clp_s/ArchiveWriter.hpp +++ b/components/core/src/clp_s/ArchiveWriter.hpp @@ -4,7 +4,6 @@ #include #include -#include #include #include diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 9ca0c947e..f1fa3857c 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -2,6 +2,17 @@ add_subdirectory(search/kql) set( CLP_SOURCES + ../clp/aws/AwsAuthenticationSigner.cpp + ../clp/aws/AwsAuthenticationSigner.hpp + ../clp/BoundedReader.cpp + ../clp/BoundedReader.hpp + ../clp/CurlDownloadHandler.cpp + ../clp/CurlDownloadHandler.hpp + ../clp/CurlEasyHandle.hpp + ../clp/CurlGlobalInstance.cpp + ../clp/CurlGlobalInstance.hpp + ../clp/CurlOperationFailed.hpp + ../clp/CurlStringList.hpp ../clp/cli_utils.cpp ../clp/cli_utils.hpp ../clp/database_utils.cpp @@ -28,11 +39,15 @@ set( ../clp/ffi/Value.hpp ../clp/FileDescriptor.cpp ../clp/FileDescriptor.hpp + ../clp/FileReader.cpp + ../clp/FileReader.hpp ../clp/GlobalMetadataDB.hpp ../clp/GlobalMetadataDBConfig.cpp ../clp/GlobalMetadataDBConfig.hpp ../clp/GlobalMySQLMetadataDB.cpp ../clp/GlobalMySQLMetadataDB.hpp + ../clp/hash_utils.cpp + ../clp/hash_utils.hpp ../clp/ir/EncodedTextAst.cpp ../clp/ir/EncodedTextAst.hpp ../clp/ir/parsing.cpp @@ -43,18 +58,24 @@ set( ../clp/MySQLParamBindings.hpp ../clp/MySQLPreparedStatement.cpp ../clp/MySQLPreparedStatement.hpp + ../clp/NetworkReader.cpp + ../clp/NetworkReader.hpp ../clp/networking/socket_utils.cpp ../clp/networking/socket_utils.hpp ../clp/ReaderInterface.cpp ../clp/ReaderInterface.hpp ../clp/ReadOnlyMemoryMappedFile.cpp ../clp/ReadOnlyMemoryMappedFile.hpp + ../clp/spdlog_with_specializations.hpp ../clp/streaming_archive/ArchiveMetadata.cpp ../clp/streaming_archive/ArchiveMetadata.hpp ../clp/streaming_compression/zstd/Decompressor.cpp ../clp/streaming_compression/zstd/Decompressor.hpp + ../clp/Thread.cpp + ../clp/Thread.hpp ../clp/TraceableException.hpp ../clp/time_types.hpp + ../clp/type_utils.hpp ../clp/utf8_utils.cpp ../clp/utf8_utils.hpp ../clp/WriterInterface.cpp @@ -89,6 +110,8 @@ set( FileReader.hpp FileWriter.cpp FileWriter.hpp + InputConfig.cpp + InputConfig.hpp JsonConstructor.cpp JsonConstructor.hpp JsonFileIterator.cpp @@ -226,12 +249,14 @@ target_link_libraries( clp-s PRIVATE absl::flat_hash_map - Boost::filesystem Boost::iostreams Boost::program_options + Boost::iostreams Boost::program_options Boost::regex Boost::url + ${CURL_LIBRARIES} clp::string_utils kql MariaDBClient::MariaDBClient ${MONGOCXX_TARGET} msgpack-cxx + OpenSSL::Crypto simdjson spdlog::spdlog yaml-cpp::yaml-cpp diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 4218d9d60..e6cd07163 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -1,8 +1,11 @@ #include "CommandLineArguments.hpp" +#include #include +#include #include +#include #include #include "../clp/cli_utils.hpp" @@ -14,6 +17,10 @@ namespace po = boost::program_options; namespace clp_s { namespace { +// Authorization method constants +constexpr std::string_view cNoAuth{"none"}; +constexpr std::string_view cS3Auth{"s3"}; + /** * Read a list of newline-delimited paths from a file and put them into a vector passed by reference * TODO: deduplicate this code with the version in clp @@ -55,6 +62,55 @@ bool read_paths_from_file( } return true; } + +/** + * Validates and populates network authorization options. + * @param auth_method + * @param network_auth + * @throws std::invalid_argument if the authorization option is invalid + */ +void validate_network_auth(std::string_view auth_method, NetworkAuthOption& auth) { + if (cS3Auth == auth_method) { + auth.method = AuthMethod::S3PresignedUrlV4; + } else if (cNoAuth != auth_method) { + throw std::invalid_argument(fmt::format("Invalid authentication type \"{}\"", auth_method)); + } +} + +/** + * Validates and populates archive paths. + * @param archive_path + * @param archive_id + * @param archive_paths + * @throws std::invalid_argument on any error + */ +void validate_archive_paths( + std::string_view archive_path, + std::string_view archive_id, + std::vector& archive_paths +) { + if (archive_path.empty()) { + throw std::invalid_argument("No archive path specified"); + } + + if (false == archive_id.empty()) { + auto archive_fs_path = std::filesystem::path(archive_path) / archive_id; + std::error_code ec; + if (false == std::filesystem::exists(archive_fs_path, ec) || ec) { + throw std::invalid_argument("Requested archive does not exist"); + } + archive_paths.emplace_back(clp_s::Path{ + .source = clp_s::InputSource::Filesystem, + .path = archive_fs_path.string() + }); + } else if (false == get_input_archives_for_raw_path(archive_path, archive_paths)) { + throw std::invalid_argument("Invalid archive path"); + } + + if (archive_paths.empty()) { + throw std::invalid_argument("No archive paths specified"); + } +} } // namespace CommandLineArguments::ParsingResult @@ -133,6 +189,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { if (Command::Compress == m_command) { po::options_description compression_positional_options; + std::vector input_paths; // clang-format off compression_positional_options.add_options()( "archives-dir", @@ -140,7 +197,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "output directory" )( "input-paths", - po::value>(&m_file_paths)->value_name("PATHS"), + po::value>(&input_paths)->value_name("PATHS"), "input paths" ); // clang-format on @@ -151,6 +208,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { constexpr std::string_view cJsonFileType{"json"}; constexpr std::string_view cKeyValueIrFileType{"kv-ir"}; std::string file_type{cJsonFileType}; + std::string auth{cNoAuth}; // clang-format off compression_options.add_options()( "compression-level", @@ -209,6 +267,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "file-type", po::value(&file_type)->value_name("FILE_TYPE")->default_value(file_type), "The type of file being compressed (json or kv-ir)" + )( + "auth", + po::value(&auth) + ->value_name("AUTH_METHOD") + ->default_value(auth), + "Type of authentication required for network requests (s3 | none). Authentication" + " with s3 requires the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment" + " variables." ); // clang-format on @@ -252,13 +318,19 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { } if (false == input_path_list_file_path.empty()) { - if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) { + if (false == read_paths_from_file(input_path_list_file_path, input_paths)) { SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path); return ParsingResult::Failure; } } - if (m_file_paths.empty()) { + for (auto const& path : input_paths) { + if (false == get_input_files_for_raw_path(path, m_input_paths)) { + throw std::invalid_argument(fmt::format("Invalid input path \"{}\".", path)); + } + } + + if (m_input_paths.empty()) { throw std::invalid_argument("No input paths specified."); } @@ -286,6 +358,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { throw std::invalid_argument("Unknown FILE_TYPE: " + file_type); } + validate_network_auth(auth, m_network_auth); + // Parse and validate global metadata DB config if (false == metadata_db_config_file_path.empty()) { clp::GlobalMetadataDBConfig metadata_db_config; @@ -310,11 +384,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { } } else if ((char)Command::Extract == command_input) { po::options_description extraction_options; + std::string archive_path; // clang-format off extraction_options.add_options()( - "archives-dir", - po::value(&m_archives_dir), - "The directory containing the archives" + "archive-path", + po::value(&archive_path), + "Path to a directory containing archives, or the path to a single archive" )( "output-dir", po::value(&m_output_dir), @@ -322,15 +397,9 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); // clang-format on - po::options_description input_options("Input Options"); - input_options.add_options()( - "archive-id", - po::value(&m_archive_id)->value_name("ID"), - "ID of the archive to decompress" - ); - extraction_options.add(input_options); - po::options_description decompression_options("Decompression Options"); + std::string auth{cNoAuth}; + std::string archive_id; // clang-format off decompression_options.add_options()( "ordered", @@ -343,6 +412,19 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ->value_name("SIZE"), "Chunk size (B) for each output file when decompressing records in log order." " When set to 0, no chunking is performed." + )( + "archive-id", + po::value(&archive_id)->value_name("ID"), + "Limit decompression to the archive with the given ID in a subdirectory of" + " archive-path" + )( + "auth", + po::value(&auth) + ->value_name("AUTH_METHOD") + ->default_value(auth), + "Type of authentication required for network requests (s3 | none). Authentication" + " with s3 requires the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment" + " variables." ); // clang-format on extraction_options.add(decompression_options); @@ -362,7 +444,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { extraction_options.add(output_metadata_options); po::positional_options_description positional_options; - positional_options.add("archives-dir", 1); + positional_options.add("archive-path", 1); positional_options.add("output-dir", 1); std::vector unrecognized_options @@ -390,16 +472,15 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::options_description visible_options; visible_options.add(general_options); - visible_options.add(input_options); visible_options.add(decompression_options); visible_options.add(output_metadata_options); std::cerr << visible_options << std::endl; return ParsingResult::InfoCommand; } - if (m_archives_dir.empty()) { - throw std::invalid_argument("No archives directory specified"); - } + validate_archive_paths(archive_path, archive_id, m_input_paths); + + validate_network_auth(auth, m_network_auth); if (m_output_dir.empty()) { throw std::invalid_argument("No output directory specified"); @@ -430,11 +511,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::options_description search_options; std::string output_handler_name; + std::string archive_path; // clang-format off search_options.add_options()( - "archives-dir", - po::value(&m_archives_dir), - "The directory containing the archives" + "archive-path", + po::value(&archive_path), + "Path to a directory containing archives, or the path to a single archive" )( "query,q", po::value(&m_query), @@ -448,12 +530,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); // clang-format on po::positional_options_description positional_options; - positional_options.add("archives-dir", 1); + positional_options.add("archive-path", 1); positional_options.add("query", 1); positional_options.add("output-handler", 1); positional_options.add("output-handler-args", -1); po::options_description match_options("Match Controls"); + std::string auth{cNoAuth}; + std::string archive_id; // clang-format off match_options.add_options()( "tge", @@ -469,8 +553,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "Ignore case distinctions between values in the query and the compressed data" )( "archive-id", - po::value(&m_archive_id)->value_name("ID"), - "Limit search to the archive with the given ID" + po::value(&archive_id)->value_name("ID"), + "Limit search to the archive with the given ID in a subdirectory of archive-path" )( "projection", po::value>(&m_projection_columns) @@ -479,6 +563,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "Project only the given set of columns for matching results. This option must be" " specified after all positional options. Values that are objects or structured" " arrays are currently unsupported." + )( + "auth", + po::value(&auth) + ->value_name("AUTH_METHOD") + ->default_value(auth), + "Type of authentication required for network requests (s3 | none). Authentication" + " with s3 requires the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment" + " variables." ); // clang-format on search_options.add(match_options); @@ -630,9 +722,9 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { return ParsingResult::InfoCommand; } - if (m_archives_dir.empty()) { - throw std::invalid_argument("No archives directory specified"); - } + validate_archive_paths(archive_path, archive_id, m_input_paths); + + validate_network_auth(auth, m_network_auth); if (m_query.empty()) { throw std::invalid_argument("No query specified"); diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 47c244646..17ee77369 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -12,6 +12,7 @@ #include "../clp/GlobalMetadataDBConfig.hpp" #include "../reducer/types.hpp" #include "Defs.hpp" +#include "InputConfig.hpp" namespace clp_s { class CommandLineArguments { @@ -51,7 +52,9 @@ class CommandLineArguments { Command get_command() const { return m_command; } - std::vector const& get_file_paths() const { return m_file_paths; } + std::vector const& get_input_paths() const { return m_input_paths; } + + NetworkAuthOption const& get_network_auth() const { return m_network_auth; } std::string const& get_archives_dir() const { return m_archives_dir; } @@ -87,8 +90,6 @@ class CommandLineArguments { bool get_ignore_case() const { return m_ignore_case; } - std::string const& get_archive_id() const { return m_archive_id; } - std::optional const& get_metadata_db_config() const { return m_metadata_db_config; } @@ -177,7 +178,8 @@ class CommandLineArguments { Command m_command; // Compression and decompression variables - std::vector m_file_paths; + std::vector m_input_paths; + NetworkAuthOption m_network_auth{}; std::string m_archives_dir; std::string m_output_dir; std::string m_timestamp_key; @@ -213,9 +215,6 @@ class CommandLineArguments { bool m_ignore_case{false}; std::vector m_projection_columns; - // Decompression and search variables - std::string m_archive_id; - // Search aggregation variables std::string m_reducer_host; int m_reducer_port{-1}; diff --git a/components/core/src/clp_s/InputConfig.cpp b/components/core/src/clp_s/InputConfig.cpp new file mode 100644 index 000000000..83c19a5c3 --- /dev/null +++ b/components/core/src/clp_s/InputConfig.cpp @@ -0,0 +1,88 @@ +#include "InputConfig.hpp" + +#include +#include +#include +#include + +#include "Utils.hpp" + +namespace clp_s { +auto get_source_for_path(std::string_view const path) -> InputSource { + try { + return std::filesystem::exists(path) ? InputSource::Filesystem : InputSource::Network; + } catch (std::exception const& e) { + return InputSource::Network; + } +} + +auto get_path_object_for_raw_path(std::string_view const path) -> Path { + return Path{.source = get_source_for_path(path), .path = std::string{path}}; +} + +auto get_input_files_for_raw_path(std::string_view const path, std::vector& files) -> bool { + return get_input_files_for_path(get_path_object_for_raw_path(path), files); +} + +auto get_input_files_for_path(Path const& path, std::vector& files) -> bool { + if (InputSource::Network == path.source) { + files.emplace_back(path); + return true; + } + + if (false == std::filesystem::is_directory(path.path)) { + files.emplace_back(path); + return true; + } + + std::vector file_paths; + if (false == FileUtils::find_all_files_in_directory(path.path, file_paths)) { + return false; + } + + for (auto& file : file_paths) { + files.emplace_back(Path{.source = InputSource::Filesystem, .path = std::move(file)}); + } + return true; +} + +auto get_input_archives_for_raw_path(std::string_view const path, std::vector& archives) + -> bool { + return get_input_archives_for_path(get_path_object_for_raw_path(path), archives); +} + +auto get_input_archives_for_path(Path const& path, std::vector& archives) -> bool { + if (InputSource::Network == path.source) { + archives.emplace_back(path); + return true; + } + + if (false == std::filesystem::is_directory(path.path)) { + archives.emplace_back(path); + return true; + } + + std::vector archive_paths; + if (false == FileUtils::find_all_archives_in_directory(path.path, archive_paths)) { + return false; + } + + for (auto& archive : archive_paths) { + archives.emplace_back(Path{.source = InputSource::Filesystem, .path = std::move(archive)}); + } + return true; +} + +auto get_archive_id_from_path(Path const& archive_path, std::string& archive_id) -> bool { + switch (archive_path.source) { + case InputSource::Network: + return UriUtils::get_last_uri_component(archive_path.path, archive_id); + case InputSource::Filesystem: + return FileUtils::get_last_non_empty_path_component(archive_path.path, archive_id); + default: + return false; + } + return true; +} + +} // namespace clp_s diff --git a/components/core/src/clp_s/InputConfig.hpp b/components/core/src/clp_s/InputConfig.hpp new file mode 100644 index 000000000..3672bb0ae --- /dev/null +++ b/components/core/src/clp_s/InputConfig.hpp @@ -0,0 +1,105 @@ +#ifndef CLP_S_INPUTCONFIG_HPP +#define CLP_S_INPUTCONFIG_HPP + +#include +#include +#include +#include +#include + +namespace clp_s { +// Constants used for input configuration +constexpr char cAwsAccessKeyIdEnvVar[] = "AWS_ACCESS_KEY_ID"; +constexpr char cAwsSecretAccessKeyEnvVar[] = "AWS_SECRET_ACCESS_KEY"; + +/** + * Enum class defining the source of a resource. + */ +enum class InputSource : uint8_t { + Filesystem, + Network +}; + +/** + * Enum class defining the authentication method required for accessing a resource. + */ +enum class AuthMethod : uint8_t { + None, + S3PresignedUrlV4 +}; + +/** + * Struct encapsulating information needed to authenticate network requests. + */ +struct NetworkAuthOption { + AuthMethod method{AuthMethod::None}; +}; + +/** + * Struct representing a resource path with its source type. + */ +struct Path { + InputSource source{InputSource::Filesystem}; + std::string path; +}; + +/** + * Determines the input source for a given raw path or url. + * @param path + * @return the InputSource for the given path + */ +[[nodiscard]] auto get_source_for_path(std::string_view const path) -> InputSource; + +/** + * Determines the input source for a given raw path or url and converts the path into a Path object. + * @param path + * @return a Path object representing the raw path or url + */ +[[nodiscard]] auto get_path_object_for_raw_path(std::string_view const path) -> Path; + +/** + * Recursively collects all file paths from the given raw path, including the path itself. + * @param path + * @param files Returned paths + * @return true on success, false otherwise + */ +auto get_input_files_for_raw_path(std::string_view const path, std::vector& files) -> bool; + +/** + * Recursively collects all file paths that are children of the the given Path, including the Path + * itself. + * @param path + * @param files Returned paths + * @return true on success, false otherwise + */ +[[nodiscard]] auto get_input_files_for_path(Path const& path, std::vector& files) -> bool; + +/** + * Collects all archives that are children of the given raw path, including the path itself. + * @param path + * @param archives Returned archives + * @return true on success, false otherwise + */ +[[nodiscard]] auto +get_input_archives_for_raw_path(std::string_view const path, std::vector& archives) -> bool; + +/** + * Collects all archives from the given Path, including the Path itself. + * @param path + * @param archives Returned archives + * @return true on success, false otherwise + */ +[[nodiscard]] auto +get_input_archives_for_path(Path const& path, std::vector& archives) -> bool; + +/** + * Determines the archive id of an archive based on the archive path. + * @param path + * @param archive_id Returned archive id + * @return true on success, false otherwise + */ +[[nodiscard]] auto +get_archive_id_from_path(Path const& archive_path, std::string& archive_id) -> bool; +} // namespace clp_s + +#endif // CLP_S_INPUTCONFIG_HPP diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 8886f2074..f1363549d 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -31,22 +31,11 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option ) ); } - - std::filesystem::path archive_path{m_option.archives_dir}; - archive_path /= m_option.archive_id; - if (false == std::filesystem::is_directory(archive_path)) { - throw OperationFailed( - ErrorCodeFailure, - __FILENAME__, - __LINE__, - fmt::format("'{}' is not a directory", archive_path.c_str()) - ); - } } void JsonConstructor::store() { m_archive_reader = std::make_unique(); - m_archive_reader->open(m_option.archives_dir, m_option.archive_id); + m_archive_reader->open(m_option.archive_path, m_option.network_auth); m_archive_reader->read_dictionaries_and_metadata(); if (m_option.ordered && false == m_archive_reader->has_log_order()) { @@ -84,7 +73,7 @@ void JsonConstructor::construct_in_order() { int64_t first_idx{}; int64_t last_idx{}; size_t chunk_size{}; - auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id; + auto src_path = std::filesystem::path(m_option.output_dir) / m_archive_reader->get_archive_id(); FileWriter writer; writer.open(src_path, FileWriter::OpenMode::CreateForWriting); @@ -123,7 +112,7 @@ void JsonConstructor::construct_in_order() { ), bsoncxx::builder::basic::kvp( constants::results_cache::decompression::cStreamId, - m_option.archive_id + std::string{m_archive_reader->get_archive_id()} ), bsoncxx::builder::basic::kvp( constants::results_cache::decompression::cBeginMsgIx, diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index 3d9228a02..533d335b4 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -11,6 +11,7 @@ #include "DictionaryReader.hpp" #include "ErrorCode.hpp" #include "FileWriter.hpp" +#include "InputConfig.hpp" #include "SchemaReader.hpp" #include "SchemaTree.hpp" #include "TraceableException.hpp" @@ -26,12 +27,12 @@ struct MetadataDbOption { }; struct JsonConstructorOption { - std::string archives_dir; - std::string archive_id; + Path archive_path{}; + NetworkAuthOption network_auth{}; std::string output_dir; bool ordered{false}; size_t target_ordered_chunk_size{}; - std::optional metadata_db; + std::optional metadata_db{std::nullopt}; }; class JsonConstructor { diff --git a/components/core/src/clp_s/JsonFileIterator.cpp b/components/core/src/clp_s/JsonFileIterator.cpp index ad6d16cd0..a0a003d9f 100644 --- a/components/core/src/clp_s/JsonFileIterator.cpp +++ b/components/core/src/clp_s/JsonFileIterator.cpp @@ -7,28 +7,19 @@ namespace clp_s { JsonFileIterator::JsonFileIterator( - std::string const& file_name, + clp::ReaderInterface& reader, size_t max_document_size, size_t buf_size ) : m_buf_size(buf_size), m_max_document_size(max_document_size), - m_buf(new char[buf_size + simdjson::SIMDJSON_PADDING]) { - try { - m_reader.open(file_name); - } catch (FileReader::OperationFailed& e) { - SPDLOG_ERROR("Failed to open {} for reading - {}", file_name, e.what()); - return; - } - + m_buf(new char[buf_size + simdjson::SIMDJSON_PADDING]), + m_reader(reader) { read_new_json(); } JsonFileIterator::~JsonFileIterator() { delete[] m_buf; - if (m_reader.is_open()) { - m_reader.close(); - } } bool JsonFileIterator::read_new_json() { @@ -59,9 +50,9 @@ bool JsonFileIterator::read_new_json() { m_buf_occupied += size_read; m_bytes_read += size_read; - if (ErrorCodeEndOfFile == file_error) { + if (clp::ErrorCode::ErrorCode_EndOfFile == file_error) { m_eof = true; - } else if (ErrorCodeSuccess != file_error) { + } else if (clp::ErrorCode::ErrorCode_Success != file_error) { m_error_code = simdjson::error_code::IO_ERROR; return false; } diff --git a/components/core/src/clp_s/JsonFileIterator.hpp b/components/core/src/clp_s/JsonFileIterator.hpp index b8db3f4f2..5464d56df 100644 --- a/components/core/src/clp_s/JsonFileIterator.hpp +++ b/components/core/src/clp_s/JsonFileIterator.hpp @@ -3,13 +3,13 @@ #include -#include "FileReader.hpp" +#include "../clp/ReaderInterface.hpp" namespace clp_s { class JsonFileIterator { public: /** - * An iterator over a file containing json objects. JSON is parsed + * An iterator over an input stream containing json objects. JSON is parsed * using simdjson::parse_many. This allows simdjson to efficiently find * delimeters between JSON objects, and if enabled parse JSON ahead of time * in another thread while the JSON is being iterated over. @@ -17,12 +17,12 @@ class JsonFileIterator { * The buffer grows automatically if there are JSON objects larger than the buffer size. * The buffer is padded to be SIMDJSON_PADDING bytes larger than the specified size. - * @param file_name the file containing JSON + * @param reader the input stream containing JSON * @param max_document_size the maximum allowed size of a single document * @param buf_size the initial buffer size */ explicit JsonFileIterator( - std::string const& file_name, + clp::ReaderInterface& reader, size_t max_document_size, size_t buf_size = 1024 * 1024 /*1MB default*/ ); @@ -35,12 +35,6 @@ class JsonFileIterator { */ [[nodiscard]] bool get_json(simdjson::ondemand::document_stream::iterator& it); - /** - * Checks if the file is open - * @return true if the file opened successfully - */ - [[nodiscard]] bool is_open() const { return m_reader.is_open(); } - /** * @return number of truncated bytes after json documents */ @@ -86,7 +80,7 @@ class JsonFileIterator { size_t m_buf_occupied{0}; size_t m_max_document_size{0}; char* m_buf{nullptr}; - FileReader m_reader; + clp::ReaderInterface& m_reader; simdjson::ondemand::parser m_parser; simdjson::ondemand::document_stream m_stream; bool m_eof{false}; diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index c917b1f09..21e3b0cfd 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -2,12 +2,14 @@ #include #include +#include #include #include #include #include #include +#include #include #include @@ -19,11 +21,14 @@ #include "../clp/ffi/utils.hpp" #include "../clp/ffi/Value.hpp" #include "../clp/ir/EncodedTextAst.hpp" +#include "../clp/NetworkReader.hpp" +#include "../clp/ReaderInterface.hpp" #include "../clp/streaming_compression/zstd/Decompressor.hpp" #include "../clp/time_types.hpp" #include "archive_constants.hpp" #include "ErrorCode.hpp" #include "JsonFileIterator.hpp" +#include "JsonParser.hpp" using clp::ffi::ir_stream::Deserializer; using clp::ffi::ir_stream::IRErrorCode; @@ -79,11 +84,9 @@ JsonParser::JsonParser(JsonParserOption const& option) m_max_document_size(option.max_document_size), m_timestamp_key(option.timestamp_key), m_structurize_arrays(option.structurize_arrays), - m_record_log_order(option.record_log_order) { - if (false == FileUtils::validate_path(option.file_paths)) { - exit(1); - } - + m_record_log_order(option.record_log_order), + m_input_paths(option.input_paths), + m_network_auth(option.network_auth) { if (false == m_timestamp_key.empty()) { if (false == clp_s::StringUtils::tokenize_column_descriptor(m_timestamp_key, m_timestamp_column)) @@ -93,10 +96,6 @@ JsonParser::JsonParser(JsonParserOption const& option) } } - for (auto& file_path : option.file_paths) { - FileUtils::find_all_files(file_path, m_file_paths); - } - m_archive_options.archives_dir = option.archives_dir; m_archive_options.compression_level = option.compression_level; m_archive_options.print_archive_stats = option.print_archive_stats; @@ -490,18 +489,19 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s } bool JsonParser::parse() { - for (auto& file_path : m_file_paths) { - JsonFileIterator json_file_iterator(file_path, m_max_document_size); - if (false == json_file_iterator.is_open()) { + for (auto const& path : m_input_paths) { + auto reader{ReaderUtils::try_create_reader(path, m_network_auth)}; + if (nullptr == reader) { m_archive_writer->close(); return false; } + JsonFileIterator json_file_iterator(*reader, m_max_document_size); if (simdjson::error_code::SUCCESS != json_file_iterator.get_error()) { SPDLOG_ERROR( "Encountered error - {} - while trying to parse {} after parsing 0 bytes", simdjson::error_message(json_file_iterator.get_error()), - file_path + path.path ); m_archive_writer->close(); return false; @@ -535,7 +535,7 @@ bool JsonParser::parse() { SPDLOG_ERROR( "Encountered non-json-object while trying to parse {} after parsing {} " "bytes", - file_path, + path.path, bytes_consumed_up_to_prev_record ); m_archive_writer->close(); @@ -560,7 +560,7 @@ bool JsonParser::parse() { SPDLOG_ERROR( "Encountered error - {} - while trying to parse {} after parsing {} bytes", error.what(), - file_path, + path.path, bytes_consumed_up_to_prev_record ); m_archive_writer->close(); @@ -594,7 +594,7 @@ bool JsonParser::parse() { SPDLOG_ERROR( "Encountered error - {} - while trying to parse {} after parsing {} bytes", simdjson::error_message(json_file_iterator.get_error()), - file_path, + path.path, bytes_consumed_up_to_prev_record ); m_archive_writer->close(); @@ -604,9 +604,27 @@ bool JsonParser::parse() { SPDLOG_WARN( "Truncated JSON ({} bytes) at end of file {}", json_file_iterator.truncated_bytes(), - file_path.c_str() + path.path ); } + + if (auto network_reader = std::dynamic_pointer_cast(reader); + nullptr != network_reader) + { + if (auto const rc = network_reader->get_curl_ret_code(); + rc.has_value() && CURLcode::CURLE_OK != rc.value()) + { + auto const curl_error_message = network_reader->get_curl_error_msg(); + SPDLOG_ERROR( + "Encountered curl error while ingesting {} - Code: {} - Message: {}", + path.path, + static_cast(rc.value()), + curl_error_message.value_or("Unknown error") + ); + m_archive_writer->close(); + return false; + } + } } return true; } @@ -835,11 +853,16 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) { } auto JsonParser::parse_from_ir() -> bool { - for (auto& file_path : m_file_paths) { + for (auto const& path : m_input_paths) { + // TODO: add support for ingesting IR from a network source + if (InputSource::Filesystem != path.source) { + m_archive_writer->close(); + return false; + } clp::streaming_compression::zstd::Decompressor decompressor; size_t curr_pos{}; size_t last_pos{}; - decompressor.open(file_path); + decompressor.open(path.path); auto deserializer_result{Deserializer::create(decompressor, IrUnitHandler{}) }; diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index a89c746c7..12199df6c 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -23,7 +23,9 @@ #include "DictionaryWriter.hpp" #include "FileReader.hpp" #include "FileWriter.hpp" +#include "InputConfig.hpp" #include "ParsedMessage.hpp" +#include "ReaderUtils.hpp" #include "Schema.hpp" #include "SchemaMap.hpp" #include "SchemaTree.hpp" @@ -37,7 +39,7 @@ using clp::ffi::KeyValuePairLogEvent; namespace clp_s { struct JsonParserOption { - std::vector file_paths; + std::vector input_paths; CommandLineArguments::FileType input_file_type{CommandLineArguments::FileType::Json}; std::string timestamp_key; std::string archives_dir; @@ -50,6 +52,7 @@ struct JsonParserOption { bool record_log_order{true}; bool single_file_archive{false}; std::shared_ptr metadata_db; + NetworkAuthOption network_auth{}; }; class JsonParser { @@ -167,7 +170,8 @@ class JsonParser { int32_t add_metadata_field(std::string_view const field_name, NodeType type); int m_num_messages; - std::vector m_file_paths; + std::vector m_input_paths; + NetworkAuthOption m_network_auth{}; Schema m_current_schema; ParsedMessage m_current_parsed_message; diff --git a/components/core/src/clp_s/ReaderUtils.cpp b/components/core/src/clp_s/ReaderUtils.cpp index a2ab5a34a..88bb31286 100644 --- a/components/core/src/clp_s/ReaderUtils.cpp +++ b/components/core/src/clp_s/ReaderUtils.cpp @@ -1,6 +1,14 @@ #include "ReaderUtils.hpp" +#include + +#include "../clp/aws/AwsAuthenticationSigner.hpp" +#include "../clp/FileReader.hpp" +#include "../clp/NetworkReader.hpp" +#include "../clp/ReaderInterface.hpp" +#include "../clp/spdlog_with_specializations.hpp" #include "archive_constants.hpp" +#include "Utils.hpp" namespace clp_s { std::shared_ptr ReaderUtils::read_schema_tree(std::string const& archives_dir) { @@ -142,22 +150,75 @@ std::shared_ptr ReaderUtils::read_schemas(std::string co return schemas_pointer; } -std::vector ReaderUtils::get_archives(std::string const& archives_dir) { - std::vector archive_paths; +namespace { +std::shared_ptr try_create_file_reader(std::string_view const file_path) { + try { + return std::make_shared(std::string{file_path}); + } catch (clp::FileReader::OperationFailed const& e) { + SPDLOG_ERROR("Failed to open file for reading - {} - {}", file_path, e.what()); + return nullptr; + } +} - if (false == boost::filesystem::is_directory(archives_dir)) { - throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); +bool try_sign_url(std::string& url) { + auto const aws_access_key = std::getenv(cAwsAccessKeyIdEnvVar); + auto const aws_secret_access_key = std::getenv(cAwsSecretAccessKeyEnvVar); + if (nullptr == aws_access_key || nullptr == aws_secret_access_key) { + SPDLOG_ERROR( + "{} and {} environment variables not available for presigned url authentication.", + cAwsAccessKeyIdEnvVar, + cAwsSecretAccessKeyEnvVar + ); + return false; } - boost::filesystem::directory_iterator iter(archives_dir); - boost::filesystem::directory_iterator end; - for (; iter != end; ++iter) { - if (boost::filesystem::is_directory(iter->path())) { - archive_paths.push_back(iter->path().string()); + clp::aws::AwsAuthenticationSigner signer{aws_access_key, aws_secret_access_key}; + + try { + clp::aws::S3Url s3_url{url}; + if (auto const rc = signer.generate_presigned_url(s3_url, url); + clp::ErrorCode::ErrorCode_Success != rc) + { + return false; } + } catch (std::exception const& e) { + return false; } - - return archive_paths; + return true; } +std::shared_ptr +try_create_network_reader(std::string_view const url, NetworkAuthOption const& auth) { + std::string request_url{url}; + switch (auth.method) { + case AuthMethod::S3PresignedUrlV4: + if (false == try_sign_url(request_url)) { + return nullptr; + } + break; + case AuthMethod::None: + break; + default: + return nullptr; + } + + try { + return std::make_shared(request_url); + } catch (clp::NetworkReader::OperationFailed const& e) { + SPDLOG_ERROR("Failed to open url for reading - {}", e.what()); + return nullptr; + } +} +} // namespace + +std::shared_ptr +ReaderUtils::try_create_reader(Path const& path, NetworkAuthOption const& network_auth) { + if (InputSource::Filesystem == path.source) { + return try_create_file_reader(path.path); + } else if (InputSource::Network == path.source) { + return try_create_network_reader(path.path, network_auth); + } else { + return nullptr; + } +} } // namespace clp_s diff --git a/components/core/src/clp_s/ReaderUtils.hpp b/components/core/src/clp_s/ReaderUtils.hpp index caa509d6a..4661b5fae 100644 --- a/components/core/src/clp_s/ReaderUtils.hpp +++ b/components/core/src/clp_s/ReaderUtils.hpp @@ -1,7 +1,11 @@ #ifndef CLP_S_READERUTILS_HPP #define CLP_S_READERUTILS_HPP +#include + +#include "../clp/ReaderInterface.hpp" #include "DictionaryReader.hpp" +#include "InputConfig.hpp" #include "Schema.hpp" #include "SchemaReader.hpp" #include "SchemaTree.hpp" @@ -67,11 +71,13 @@ class ReaderUtils { ); /** - * Gets the list of archives in the given archive directory - * @param archives_dir - * @return the list of archives + * Tries to open a clp::ReaderInterface using the given Path and NetworkAuthOption. + * @param path + * @param network_auth + * @return the opened clp::ReaderInterface or nullptr on error */ - static std::vector get_archives(std::string const& archives_dir); + static std::shared_ptr + try_create_reader(Path const& path, NetworkAuthOption const& network_auth); private: /** diff --git a/components/core/src/clp_s/Utils.cpp b/components/core/src/clp_s/Utils.cpp index acee48851..19b564c03 100644 --- a/components/core/src/clp_s/Utils.cpp +++ b/components/core/src/clp_s/Utils.cpp @@ -1,42 +1,52 @@ #include "Utils.hpp" -#include +#include +#include +#include + +#include +#include #include +#include "archive_constants.hpp" + using std::string; using std::string_view; namespace clp_s { -bool FileUtils::find_all_files(std::string const& path, std::vector& file_paths) { +bool FileUtils::find_all_files_in_directory( + std::string const& path, + std::vector& file_paths +) { try { - if (false == boost::filesystem::is_directory(path)) { + if (false == std::filesystem::is_directory(path)) { // path is a file file_paths.push_back(path); return true; } - if (boost::filesystem::is_empty(path)) { + if (std::filesystem::is_empty(path)) { // path is an empty directory return true; } // Iterate directory - boost::filesystem::recursive_directory_iterator iter( + std::filesystem::recursive_directory_iterator iter( path, - boost::filesystem::directory_options::follow_directory_symlink + std::filesystem::directory_options::follow_directory_symlink ); - boost::filesystem::recursive_directory_iterator end; + std::filesystem::recursive_directory_iterator end; for (; iter != end; ++iter) { // Check if current entry is an empty directory or a file - if (boost::filesystem::is_directory(iter->path())) { - if (boost::filesystem::is_empty(iter->path())) { + if (std::filesystem::is_directory(iter->path())) { + if (std::filesystem::is_empty(iter->path())) { iter.disable_recursion_pending(); } } else { file_paths.push_back(iter->path().string()); } } - } catch (boost::filesystem::filesystem_error& exception) { + } catch (std::exception const& exception) { SPDLOG_ERROR( "Failed to find files/directories at '{}' - {}.", path.c_str(), @@ -48,16 +58,106 @@ bool FileUtils::find_all_files(std::string const& path, std::vector return true; } -bool FileUtils::validate_path(std::vector const& paths) { - bool all_paths_exist = true; - for (auto const& path : paths) { - if (false == boost::filesystem::exists(path)) { - SPDLOG_ERROR("'{}' does not exist.", path.c_str()); - all_paths_exist = false; +namespace { +/** + * Determines if a directory is a multi-file archive. + * @param path + * @return true if this directory is a multi-file archive, false otherwise + */ +bool is_multi_file_archive(std::string_view const path) { + for (auto const& entry : std::filesystem::directory_iterator{path}) { + if (entry.is_directory()) { + return false; } + + std::string file_name; + if (false == FileUtils::get_last_non_empty_path_component(entry.path().string(), file_name)) + { + return false; + } + auto formatted_name = fmt::format("/{}", file_name); + if (constants::cArchiveTimestampDictFile == formatted_name + || constants::cArchiveSchemaTreeFile == formatted_name + || constants::cArchiveSchemaMapFile == formatted_name + || constants::cArchiveVarDictFile == formatted_name + || constants::cArchiveLogDictFile == formatted_name + || constants::cArchiveArrayDictFile == formatted_name + || constants::cArchiveTableMetadataFile == formatted_name + || constants::cArchiveTablesFile == formatted_name) + { + continue; + } else { + try { + auto segment_file_number = std::stoi(file_name); + continue; + } catch (std::exception const& e) { + return false; + } + } + } + return true; +} +} // namespace + +bool FileUtils::find_all_archives_in_directory( + std::string_view const path, + std::vector& archive_paths +) { + try { + if (false == std::filesystem::is_directory(path)) { + return false; + } + } catch (std::exception const& e) { + return false; } - return all_paths_exist; + if (is_multi_file_archive(path)) { + archive_paths.emplace_back(path); + return true; + } + + for (auto const& entry : std::filesystem::directory_iterator{path}) { + archive_paths.emplace_back(entry.path().string()); + } + return true; +} + +bool FileUtils::get_last_non_empty_path_component(std::string_view const path, std::string& name) { + std::filesystem::path fs_path; + try { + fs_path = std::filesystem::path{path}.lexically_normal(); + } catch (std::exception const& e) { + return false; + } + + if (fs_path.has_filename() && false == fs_path.filename().string().empty()) { + name = fs_path.filename().string(); + return true; + } + + while (fs_path.has_parent_path()) { + fs_path = fs_path.parent_path(); + if (fs_path.has_filename() && false == fs_path.filename().string().empty()) { + name = fs_path.filename().string(); + return true; + } + } + + return false; +} + +bool UriUtils::get_last_uri_component(std::string_view const uri, std::string& name) { + auto parsed_result = boost::urls::parse_uri(uri); + if (false == parsed_result.has_value()) { + return false; + } + auto parsed_uri = parsed_result.value(); + auto path_segments_view = parsed_uri.segments(); + if (path_segments_view.empty()) { + return false; + } + name = path_segments_view.back(); + return true; } bool StringUtils::get_bounds_of_next_var(string const& msg, size_t& begin_pos, size_t& end_pos) { diff --git a/components/core/src/clp_s/Utils.hpp b/components/core/src/clp_s/Utils.hpp index 553f7e608..0181a6749 100644 --- a/components/core/src/clp_s/Utils.hpp +++ b/components/core/src/clp_s/Utils.hpp @@ -5,26 +5,60 @@ #include #include #include - -#include +#include +#include namespace clp_s { class FileUtils { public: /** - * Find all files in a directory + * Finds all files in a directory * @param path * @param file_paths * @return true if successful, false otherwise */ - static bool find_all_files(std::string const& path, std::vector& file_paths); + static bool + find_all_files_in_directory(std::string const& path, std::vector& file_paths); /** - * Validate if all paths exist - * @param paths - * @return true if all paths exist, false otherwise + * Finds all archives in a directory, including the directory itself + * @param path + * @param archive_paths + * @return true if successful, false otherwise + */ + static bool find_all_archives_in_directory( + std::string_view const path, + std::vector& archive_paths + ); + + /** + * Gets the last non-empty component of a path, accounting for trailing forward slashes. + * + * For example: + * ./foo/bar.baz -> bar.baz + * ./foo/bar.baz/ -> bar.baz + * + * @param path + * @param name Returned component name + * @return true on success, false otherwise + */ + static bool get_last_non_empty_path_component(std::string_view const path, std::string& name); +}; + +class UriUtils { +public: + /** + * Gets the last component of a uri. + * + * For example: + * https://www.something.org/abc-xyz -> abc-xyz + * https://www.something.org/aaa/bbb/abc-xyz?something=something -> abc-xyz + * + * @param uri + * @param name Returned component name + * @return true on success, false otherwise */ - static bool validate_path(std::vector const& paths); + static bool get_last_uri_component(std::string_view const uri, std::string& name); }; class StringUtils { diff --git a/components/core/src/clp_s/ZstdDecompressor.cpp b/components/core/src/clp_s/ZstdDecompressor.cpp index 87d3ae8fa..c6c7f99e7 100644 --- a/components/core/src/clp_s/ZstdDecompressor.cpp +++ b/components/core/src/clp_s/ZstdDecompressor.cpp @@ -3,8 +3,9 @@ #include "ZstdDecompressor.hpp" #include +#include -#include +#include #include namespace clp_s { @@ -202,14 +203,13 @@ ErrorCode ZstdDecompressor::open(std::string const& compressed_file_path) { m_input_type = InputType::MemoryMappedCompressedFile; // Create memory mapping for compressed_file_path, use boost read only memory mapped file - boost::system::error_code boost_error_code; - size_t compressed_file_size - = boost::filesystem::file_size(compressed_file_path, boost_error_code); - if (boost_error_code) { + std::error_code error_code; + size_t compressed_file_size = std::filesystem::file_size(compressed_file_path, error_code); + if (error_code) { SPDLOG_ERROR( "ZstdDecompressor: Unable to obtain file size for '{}' - {}.", compressed_file_path.c_str(), - boost_error_code.message().c_str() + error_code.message().c_str() ); return ErrorCodeFailure; } diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 0f7b5643a..c21a1f3d3 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -11,6 +12,7 @@ #include #include +#include "../clp/CurlGlobalInstance.hpp" #include "../clp/GlobalMySQLMetadataDB.hpp" #include "../clp/streaming_archive/ArchiveMetadata.hpp" #include "../reducer/network_utils.hpp" @@ -18,7 +20,6 @@ #include "Defs.hpp" #include "JsonConstructor.hpp" #include "JsonParser.hpp" -#include "ReaderUtils.hpp" #include "search/AddTimestampConditions.hpp" #include "search/ConvertToExists.hpp" #include "search/EmptyExpr.hpp" @@ -87,7 +88,8 @@ bool compress(CommandLineArguments const& command_line_arguments) { } clp_s::JsonParserOption option{}; - option.file_paths = command_line_arguments.get_file_paths(); + option.input_paths = command_line_arguments.get_input_paths(); + option.network_auth = command_line_arguments.get_network_auth(); option.input_file_type = command_line_arguments.get_file_type(); option.archives_dir = archives_dir.string(); option.target_encoded_size = command_line_arguments.get_target_encoded_size(); @@ -281,6 +283,7 @@ int main(int argc, char const* argv[]) { clp_s::TimestampPattern::init(); mongocxx::instance const mongocxx_instance{}; + clp::CurlGlobalInstance const curl_instance{}; CommandLineArguments command_line_arguments("clp-s"); auto parsing_result = command_line_arguments.parse_arguments(argc, argv); @@ -299,37 +302,21 @@ int main(int argc, char const* argv[]) { return 1; } } else if (CommandLineArguments::Command::Extract == command_line_arguments.get_command()) { - auto const& archives_dir = command_line_arguments.get_archives_dir(); - if (false == std::filesystem::is_directory(archives_dir)) { - SPDLOG_ERROR("'{}' is not a directory.", archives_dir); - return 1; - } - clp_s::JsonConstructorOption option{}; option.output_dir = command_line_arguments.get_output_dir(); option.ordered = command_line_arguments.get_ordered_decompression(); - option.archives_dir = archives_dir; option.target_ordered_chunk_size = command_line_arguments.get_target_ordered_chunk_size(); + option.network_auth = command_line_arguments.get_network_auth(); if (false == command_line_arguments.get_mongodb_uri().empty()) { option.metadata_db = {command_line_arguments.get_mongodb_uri(), command_line_arguments.get_mongodb_collection()}; } + try { - auto const& archive_id = command_line_arguments.get_archive_id(); - if (false == archive_id.empty()) { - option.archive_id = archive_id; + for (auto const& archive_path : command_line_arguments.get_input_paths()) { + option.archive_path = archive_path; decompress_archive(option); - } else { - for (auto const& entry : std::filesystem::directory_iterator(archives_dir)) { - if (false == entry.is_directory()) { - // Skip non-directories - continue; - } - - option.archive_id = entry.path().filename(); - decompress_archive(option); - } } } catch (clp_s::TraceableException& e) { SPDLOG_ERROR("{}", e.what()); @@ -348,12 +335,6 @@ int main(int argc, char const* argv[]) { return 1; } - auto const& archives_dir = command_line_arguments.get_archives_dir(); - if (false == std::filesystem::is_directory(archives_dir)) { - SPDLOG_ERROR("'{}' is not a directory.", archives_dir); - return 1; - } - int reducer_socket_fd{-1}; if (command_line_arguments.get_output_handler_type() == CommandLineArguments::OutputHandlerType::Reducer) @@ -369,37 +350,25 @@ int main(int argc, char const* argv[]) { } } - auto const& archive_id = command_line_arguments.get_archive_id(); auto archive_reader = std::make_shared(); - if (false == archive_id.empty()) { - archive_reader->open(archives_dir, archive_id); + for (auto const& archive_path : command_line_arguments.get_input_paths()) { + try { + archive_reader->open(archive_path, command_line_arguments.get_network_auth()); + } catch (std::exception const& e) { + SPDLOG_ERROR("Failed to open archive - {}", e.what()); + return 1; + } if (false - == search_archive(command_line_arguments, archive_reader, expr, reducer_socket_fd)) + == search_archive( + command_line_arguments, + archive_reader, + expr->copy(), + reducer_socket_fd + )) { return 1; } archive_reader->close(); - } else { - for (auto const& entry : std::filesystem::directory_iterator(archives_dir)) { - if (false == entry.is_directory()) { - // Skip non-directories - continue; - } - - auto const archive_id = entry.path().filename().string(); - archive_reader->open(archives_dir, archive_id); - if (false - == search_archive( - command_line_arguments, - archive_reader, - expr->copy(), - reducer_socket_fd - )) - { - return 1; - } - archive_reader->close(); - } } } diff --git a/components/core/src/clp_s/search/kql/CMakeLists.txt b/components/core/src/clp_s/search/kql/CMakeLists.txt index ee36ee124..9dba44a4b 100644 --- a/components/core/src/clp_s/search/kql/CMakeLists.txt +++ b/components/core/src/clp_s/search/kql/CMakeLists.txt @@ -26,3 +26,4 @@ add_library( target_compile_features(kql PRIVATE cxx_std_20) target_include_directories(kql PRIVATE ${ANTLR_KqlParser_OUTPUT_DIR}) target_link_libraries(kql PRIVATE antlr4_static Boost::filesystem) + diff --git a/components/core/tests/test-clp_s-end_to_end.cpp b/components/core/tests/test-clp_s-end_to_end.cpp index 3f138b472..259b46b93 100644 --- a/components/core/tests/test-clp_s-end_to_end.cpp +++ b/components/core/tests/test-clp_s-end_to_end.cpp @@ -9,6 +9,7 @@ #include #include +#include "../src/clp_s/InputConfig.hpp" #include "../src/clp_s/JsonConstructor.hpp" #include "../src/clp_s/JsonParser.hpp" @@ -70,7 +71,10 @@ void compress(bool structurize_arrays) { REQUIRE((std::filesystem::is_directory(cTestEndToEndArchiveDirectory))); clp_s::JsonParserOption parser_option{}; - parser_option.file_paths.push_back(get_test_input_local_path()); + parser_option.input_paths.emplace_back(clp_s::Path{ + .source = clp_s::InputSource::Filesystem, + .path = get_test_input_local_path() + }); parser_option.archives_dir = cTestEndToEndArchiveDirectory; parser_option.target_encoded_size = cDefaultTargetEncodedSize; parser_option.max_document_size = cDefaultMaxDocumentSize; @@ -94,17 +98,19 @@ auto extract() -> std::filesystem::path { REQUIRE(std::filesystem::is_directory(cTestEndToEndOutputDirectory)); clp_s::JsonConstructorOption constructor_option{}; - constructor_option.archives_dir = cTestEndToEndArchiveDirectory; constructor_option.output_dir = cTestEndToEndOutputDirectory; constructor_option.ordered = cDefaultOrdered; constructor_option.target_ordered_chunk_size = cDefaultTargetOrderedChunkSize; - for (auto const& entry : std::filesystem::directory_iterator(constructor_option.archives_dir)) { + for (auto const& entry : std::filesystem::directory_iterator(cTestEndToEndArchiveDirectory)) { if (false == entry.is_directory()) { // Skip non-directories continue; } - constructor_option.archive_id = entry.path().filename(); + constructor_option.archive_path = clp_s::Path{ + .source{clp_s::InputSource::Filesystem}, + .path{entry.path().string()} + }; clp_s::JsonConstructor constructor{constructor_option}; constructor.store(); }