Skip to content

Commit

Permalink
Separate out minifi-api/extension-utils/utils from libminifi
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdebreceni authored and martinzink committed Nov 25, 2024
1 parent db574c8 commit 37eedc3
Show file tree
Hide file tree
Showing 786 changed files with 13,172 additions and 5,931 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ endfunction()
SET(TEST_DIR ${CMAKE_SOURCE_DIR}/libminifi/test)
include(Extensions)

add_subdirectory(core)
add_subdirectory(utils)
add_subdirectory(extension-utils)
add_subdirectory(libminifi)

if (ENABLE_ALL OR ENABLE_AZURE)
Expand Down
6 changes: 3 additions & 3 deletions controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::shared_ptr<minifi::core::controller::ControllerService> getControllerServic
const std::string &service_name) {
std::string nifi_configuration_class_name = "adaptiveconfiguration";

minifi::core::extension::ExtensionManager::get().initialize(configuration);
minifi::core::extension::ExtensionManagerImpl::get().initialize(configuration);

configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
auto flow_configuration = minifi::core::createFlowConfiguration(
Expand Down Expand Up @@ -78,7 +78,7 @@ std::shared_ptr<minifi::controllers::SSLContextService> getSSLContextService(con
if (nullptr == secure_context) {
std::string secureStr;
if (configuration->get(minifi::Configure::nifi_remote_input_secure, secureStr) && minifi::utils::string::toBool(secureStr).value_or(false)) {
secure_context = std::make_shared<minifi::controllers::SSLContextService>("ControllerSocketProtocolSSL", configuration);
secure_context = std::make_shared<minifi::controllers::SSLContextServiceImpl>("ControllerSocketProtocolSSL", configuration);
secure_context->onEnable();
}
} else {
Expand All @@ -96,7 +96,7 @@ int main(int argc, char **argv) {
return -1;
}

const auto configuration = std::make_shared<minifi::Configure>();
const auto configuration = std::make_shared<minifi::ConfigureImpl>();
configuration->setHome(minifi_home);
configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);

Expand Down
19 changes: 10 additions & 9 deletions controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "controllers/SSLContextService.h"
#include "utils/StringUtils.h"
#include "state/UpdateController.h"
#include "core/state/nodes/ResponseNodeLoader.h"

using namespace std::literals::chrono_literals;

Expand Down Expand Up @@ -201,10 +202,10 @@ class TestControllerSocketReporter : public c2::ControllerSocketReporter {
}
};

class TestControllerServiceProvider : public core::controller::ControllerServiceProvider {
class TestControllerServiceProvider : public core::controller::ControllerServiceProviderImpl {
public:
explicit TestControllerServiceProvider(std::shared_ptr<controllers::SSLContextService> ssl_context_service)
: core::controller::ControllerServiceProvider("TestControllerServiceProvider"),
: core::controller::ControllerServiceProviderImpl("TestControllerServiceProvider"),
ssl_context_service_(std::move(ssl_context_service)) {
}
std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string&) const override {
Expand Down Expand Up @@ -239,7 +240,7 @@ class ControllerTestFixture {
};

ControllerTestFixture()
: configuration_(std::make_shared<minifi::Configure>()),
: configuration_(std::make_shared<minifi::ConfigureImpl>()),
controller_(std::make_shared<TestStateController>()),
update_sink_(std::make_unique<TestUpdateSink>(controller_)) {
configuration_->set(minifi::Configure::controller_socket_host, "localhost");
Expand All @@ -249,7 +250,7 @@ class ControllerTestFixture {
configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, "abcdefgh");
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "root-ca.pem").string());
configuration_->set(minifi::Configure::controller_ssl_context_service, "SSLContextService");
ssl_context_service_ = std::make_shared<controllers::SSLContextService>("SSLContextService", configuration_);
ssl_context_service_ = std::make_shared<controllers::SSLContextServiceImpl>("SSLContextService", configuration_);
ssl_context_service_->onEnable();
controller_service_provider_ = std::make_unique<TestControllerServiceProvider>(ssl_context_service_);
controller_socket_data_.host = "localhost";
Expand Down Expand Up @@ -478,7 +479,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test manifest getter", "[controllerTest
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand All @@ -501,7 +502,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack getter", "[controllerTests]
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand Down Expand Up @@ -529,7 +530,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle getter", "[controller
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand All @@ -543,7 +544,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle is created to non-exi
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand All @@ -557,7 +558,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if target
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand Down
3 changes: 3 additions & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_library(minifi-core INTERFACE)
target_include_directories(minifi-core INTERFACE include)
target_link_libraries(minifi-core INTERFACE gsl-lite)
75 changes: 75 additions & 0 deletions core/include/minifi-cpp/Connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* @file Connection.h
* Connection class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <set>
#include <string>
#include <vector>
#include <map>
#include <mutex>
#include <atomic>
#include <algorithm>
#include <utility>
#include "core/Core.h"
#include "core/Connectable.h"
#include "core/logging/Logger.h"
#include "core/Relationship.h"
#include "core/FlowFile.h"
#include "core/Repository.h"
#include "minifi-cpp/utils/Literals.h"

namespace org::apache::nifi::minifi {

class Connection : public virtual core::Connectable {
public:
~Connection() override = default;

static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_COUNT = 2000;
static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE = 100_MB;

virtual void setSourceUUID(const utils::Identifier &uuid) = 0;
virtual void setDestinationUUID(const utils::Identifier &uuid) = 0;
virtual utils::Identifier getSourceUUID() const = 0;
virtual utils::Identifier getDestinationUUID() const = 0;
virtual void setSource(core::Connectable* source) = 0;
virtual core::Connectable* getSource() const = 0;
virtual void setDestination(core::Connectable* dest) = 0;
virtual core::Connectable* getDestination() const = 0;
virtual void addRelationship(core::Relationship relationship) = 0;
virtual const std::set<core::Relationship> &getRelationships() const = 0;
virtual void setBackpressureThresholdCount(uint64_t size) = 0;
virtual uint64_t getBackpressureThresholdCount() const = 0;
virtual void setBackpressureThresholdDataSize(uint64_t size) = 0;
virtual uint64_t getBackpressureThresholdDataSize() const = 0;
virtual void setSwapThreshold(uint64_t size) = 0;
virtual void setFlowExpirationDuration(std::chrono::milliseconds duration) = 0;
virtual std::chrono::milliseconds getFlowExpirationDuration() const = 0;
virtual void setDropEmptyFlowFiles(bool drop) = 0;
virtual bool getDropEmptyFlowFiles() const = 0;
virtual bool isEmpty() const = 0;
virtual bool backpressureThresholdReached() const = 0;
virtual uint64_t getQueueSize() const = 0;
virtual uint64_t getQueueDataSize() = 0;
virtual void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) = 0;
virtual std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) = 0;
virtual void drain(bool delete_permanently) = 0;
};
} // namespace org::apache::nifi::minifi
File renamed without changes.
55 changes: 55 additions & 0 deletions core/include/minifi-cpp/FlowFileRecord.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* @file FlowFileRecord.h
* Flow file record class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <string>
#include <vector>
#include <queue>
#include <map>
#include <mutex>
#include <atomic>
#include <iostream>
#include <sstream>
#include <fstream>
#include <set>
#include "minifi-cpp/core/ContentRepository.h"
#include "minifi-cpp/core/FlowFile.h"
#include "minifi-cpp/core/Repository.h"
#include "io/OutputStream.h"

namespace org::apache::nifi::minifi {

class FlowFileRecord : public virtual core::FlowFile {
public:
virtual bool Serialize(io::OutputStream &outStream) = 0;

//! Serialize and Persistent to the repository
virtual bool Persist(const std::shared_ptr<core::Repository>& flowRepository) = 0;

static std::shared_ptr<FlowFileRecord> DeSerialize(std::span<const std::byte> buffer, const std::shared_ptr<core::ContentRepository> &content_repo, utils::Identifier &container);
static std::shared_ptr<FlowFileRecord> DeSerialize(io::InputStream &stream, const std::shared_ptr<core::ContentRepository> &content_repo, utils::Identifier &container);
static std::shared_ptr<FlowFileRecord> DeSerialize(const std::string& key, const std::shared_ptr<core::Repository>& flowRepository,
const std::shared_ptr<core::ContentRepository> &content_repo, utils::Identifier &container);

virtual std::string getContentFullPath() const = 0;
};

} // namespace org::apache::nifi::minifi
61 changes: 61 additions & 0 deletions core/include/minifi-cpp/ResourceClaim.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
#include <vector>
#include <queue>
#include <map>
#include <memory>
#include <mutex>
#include <atomic>
#include "core/Core.h"
#include "core/StreamManager.h"
#include "properties/Configure.h"
#include "utils/Id.h"

namespace org::apache::nifi::minifi {

namespace core {
class ContentRepository;
} // namespace core

class ResourceClaim {
public:
using Path = std::string;

virtual ~ResourceClaim() = default;
virtual void increaseFlowFileRecordOwnedCount() = 0;
virtual void decreaseFlowFileRecordOwnedCount() = 0;
virtual uint64_t getFlowFileRecordOwnedCount() = 0;
virtual Path getContentFullPath() const = 0;
virtual bool exists() = 0;

static std::shared_ptr<ResourceClaim> create(std::shared_ptr<core::ContentRepository> repository);

virtual std::ostream& write(std::ostream& stream) const = 0;

friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) {
return claim.write(stream);
}

friend std::ostream& operator<<(std::ostream& stream, const std::shared_ptr<ResourceClaim>& claim) {
return claim->write(stream);
}
};

} // namespace org::apache::nifi::minifi
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <vector>
#include <memory>

#include "core/FlowFile.h"
#include "minifi-cpp/core/FlowFile.h"
#include "utils/Id.h"

namespace org::apache::nifi::minifi {
Expand Down
71 changes: 71 additions & 0 deletions core/include/minifi-cpp/agent/agent_docs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <map>
#include <string>
#include <utility>
#include <vector>

#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/DynamicProperty.h"
#include "minifi-cpp/core/OutputAttributeDefinition.h"
#include "minifi-cpp/core/Property.h"
#include "minifi-cpp/core/Relationship.h"
#include "minifi-cpp/core/RelationshipDefinition.h"

namespace org::apache::nifi::minifi {

enum class ResourceType {
Processor, ControllerService, InternalResource, DescriptionOnly
};

struct ClassDescription {
ResourceType type_ = ResourceType::Processor;
std::string short_name_{};
std::string full_name_{};
std::string description_{};
std::vector<core::Property> class_properties_{};
std::span<const core::DynamicProperty> dynamic_properties_{};
std::vector<core::Relationship> class_relationships_{};
std::span<const core::OutputAttributeReference> output_attributes_{};
bool supports_dynamic_properties_ = false;
bool supports_dynamic_relationships_ = false;
std::string inputRequirement_{};
bool isSingleThreaded_ = false;
};

struct Components {
std::vector<ClassDescription> processors_;
std::vector<ClassDescription> controller_services_;
std::vector<ClassDescription> other_components_;

[[nodiscard]] bool empty() const noexcept {
return processors_.empty() && controller_services_.empty() && other_components_.empty();
}
};

class AgentDocs {
public:
static const std::map<std::string, Components>& getClassDescriptions();
static std::map<std::string, Components>& getMutableClassDescriptions();

template<typename Class, ResourceType Type>
static void createClassDescription(const std::string& group, const std::string& name);
};

} // namespace org::apache::nifi::minifi
Loading

0 comments on commit 37eedc3

Please sign in to comment.