diff --git a/collector/lib/Pipeline.cpp b/collector/lib/Pipeline.cpp new file mode 100644 index 0000000000..9fdb2eb755 --- /dev/null +++ b/collector/lib/Pipeline.cpp @@ -0,0 +1,2 @@ + +#include "Pipeline.h" diff --git a/collector/lib/Pipeline.h b/collector/lib/Pipeline.h new file mode 100644 index 0000000000..0f39db8a40 --- /dev/null +++ b/collector/lib/Pipeline.h @@ -0,0 +1,196 @@ +#ifndef _COLLECTOR_PIPELINE_H +#define _COLLECTOR_PIPELINE_H + +#include +#include +#include +#include +#include +#include +#include + +#include "StoppableThread.h" + +namespace collector { + +template +class Queue { + public: + Queue() {} + ~Queue() {} + + bool empty() { + auto lock = read_lock(); + return inner_.empty(); + } + + size_t size() { + auto lock = read_lock(); + return inner_.size(); + } + + void push(const T& elem) { + { + auto lock = write_lock(); + auto e = elem; + inner_.push(std::move(e)); + } + state_changed_.notify_one(); + } + + void push(T&& elem) { + { + auto lock = write_lock(); + inner_.push(elem); + } + state_changed_.notify_one(); + } + + std::optional pop(std::chrono::milliseconds wait_max = std::chrono::milliseconds(10)) { + auto lock = write_lock(); + if (inner_.empty()) { + auto pred = [this]() { + return !inner_.empty(); + }; + + if (!state_changed_.wait_for(lock, wait_max, pred)) { + return std::nullopt; + } + } + T data = inner_.front(); + inner_.pop(); + return {data}; + } + + std::shared_lock read_lock() const { + return std::shared_lock(mx_); + } + + std::unique_lock write_lock() const { + return std::unique_lock(mx_); + } + + private: + std::queue inner_; + + mutable std::shared_mutex mx_; + mutable std::condition_variable_any state_changed_; +}; + +template +class Producer { + public: + Producer(std::shared_ptr>& output) : output_(output) {} + + ~Producer() { + if (thread_.running()) { + Stop(); + } + } + + virtual std::optional next() = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = next(); + if (!event.has_value()) { + break; + } + output_->push(event.value()); + } + } + + protected: + std::shared_ptr>& output_; + StoppableThread thread_; +}; + +template +class Consumer { + public: + Consumer(std::shared_ptr>& input) : input_(input) {} + + ~Consumer() { + if (thread_.running()) { + Stop(); + } + } + + virtual void handle(const T& event) = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = input_->pop(); + if (!event.has_value()) { + continue; + } + handle(event.value()); + } + } + + protected: + std::shared_ptr>& input_; + StoppableThread thread_; +}; + +template +class Transformer { + public: + Transformer(std::shared_ptr>& input, std::shared_ptr>& output) + : input_(input), output_(output) {} + + ~Transformer() { Stop(); } + + virtual std::optional transform(const In& event) = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = input_->pop(); + if (!event.has_value()) { + continue; + } + + auto transformed = transform(event.value()); + if (transformed.has_value()) { + output_.push(transformed.value()); + } + } + } + + protected: + std::shared_ptr> input_; + std::shared_ptr> output_; + + StoppableThread thread_; +}; + +template +using Filter = Transformer; + +} // namespace collector + +#endif diff --git a/collector/lib/StoppableThread.cpp b/collector/lib/StoppableThread.cpp index 13e05444d2..97e2557d38 100644 --- a/collector/lib/StoppableThread.cpp +++ b/collector/lib/StoppableThread.cpp @@ -1,6 +1,8 @@ #include "StoppableThread.h" +#include #include +#include #include #include "Utility.h" @@ -44,6 +46,10 @@ void StoppableThread::Stop() { } break; } + if (!thread_->joinable()) { + CLOG(WARNING) << "thread not yet joinable..."; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } thread_->join(); thread_.reset(); int rv = close(stop_pipe_[0]); diff --git a/collector/test/PipelineTests.cpp b/collector/test/PipelineTests.cpp new file mode 100644 index 0000000000..a807ecc64a --- /dev/null +++ b/collector/test/PipelineTests.cpp @@ -0,0 +1,72 @@ +#include +#include +#include +#include +#include +#include + +#include "Pipeline.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace collector { + +class IntProducer : public Producer { + public: + IntProducer(std::shared_ptr>& input, int limit) : Producer(input), limit_(limit) {} + + std::optional next() override { + n_++; + if (n_ > limit_) { + return std::nullopt; + } + return {n_}; + } + + private: + int n_ = 0; + int limit_; +}; + +class IntConsumer : public Consumer { + public: + IntConsumer(std::shared_ptr>& input, std::vector& output) : Consumer(input), events_(output) {} + + void handle(const int& event) override { + events_.push_back(event); + } + + private: + std::vector& events_; +}; + +class EvenIntFilter : public Filter { + public: + std::optional transform(const int& event) { + if (event % 2 == 0) { + return {event}; + } + return std::nullopt; + } +}; + +TEST(PipelineTests, TestBasic) { + std::shared_ptr> queue = std::make_shared>(); + + std::vector output; + + std::unique_ptr> producer = std::make_unique(queue, 10); + std::unique_ptr> consumer = std::make_unique(queue, output); + + producer->Start(); + consumer->Start(); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + consumer->Stop(); + producer->Stop(); + + EXPECT_TRUE(output.size() == 10); +} + +} // namespace collector