Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimentation with execution pipelines #1947

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions collector/lib/Pipeline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

#include "Pipeline.h"
196 changes: 196 additions & 0 deletions collector/lib/Pipeline.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#ifndef _COLLECTOR_PIPELINE_H
#define _COLLECTOR_PIPELINE_H

#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <shared_mutex>

#include "StoppableThread.h"

namespace collector {

template <typename T>
class Queue {
public:
Queue() {}
~Queue() {}

bool empty() {
auto lock = read_lock();
return inner_.empty();
}

size_t size() {
auto lock = read_lock();
return inner_.size();
}

void push(const T& elem) {
{
auto lock = write_lock();
auto e = elem;
inner_.push(std::move(e));
}
state_changed_.notify_one();
}

void push(T&& elem) {
{
auto lock = write_lock();
inner_.push(elem);
}
state_changed_.notify_one();
}

std::optional<T> pop(std::chrono::milliseconds wait_max = std::chrono::milliseconds(10)) {
auto lock = write_lock();
if (inner_.empty()) {
auto pred = [this]() {
return !inner_.empty();
};

if (!state_changed_.wait_for(lock, wait_max, pred)) {
return std::nullopt;
}
}
T data = inner_.front();
inner_.pop();
return {data};
}

std::shared_lock<std::shared_mutex> read_lock() const {
return std::shared_lock(mx_);
}

std::unique_lock<std::shared_mutex> write_lock() const {
return std::unique_lock(mx_);
}

private:
std::queue<T> inner_;

mutable std::shared_mutex mx_;
mutable std::condition_variable_any state_changed_;
};

template <class T>
class Producer {
public:
Producer(std::shared_ptr<Queue<T>>& output) : output_(output) {}

~Producer() {
if (thread_.running()) {
Stop();
}
}

virtual std::optional<T> next() = 0;

void Start() {
thread_.Start([this] { Run(); });
}

void Stop() {
thread_.Stop();
}

void Run() {
while (!thread_.should_stop()) {
auto event = next();
if (!event.has_value()) {
break;
}
output_->push(event.value());
}
}

protected:
std::shared_ptr<Queue<T>>& output_;
StoppableThread thread_;
};

template <class T>
class Consumer {
public:
Consumer(std::shared_ptr<Queue<T>>& input) : input_(input) {}

~Consumer() {
if (thread_.running()) {
Stop();
}
}

virtual void handle(const T& event) = 0;

void Start() {
thread_.Start([this] { Run(); });
}

void Stop() {
thread_.Stop();
}

void Run() {
while (!thread_.should_stop()) {
auto event = input_->pop();
if (!event.has_value()) {
continue;
}
handle(event.value());
}
}

protected:
std::shared_ptr<Queue<T>>& input_;
StoppableThread thread_;
};

template <class In, class Out>
class Transformer {
public:
Transformer(std::shared_ptr<Queue<In>>& input, std::shared_ptr<Queue<Out>>& output)
: input_(input), output_(output) {}

~Transformer() { Stop(); }

virtual std::optional<Out> transform(const In& event) = 0;

void Start() {
thread_.Start([this] { Run(); });
}

void Stop() {
thread_.Stop();
}

void Run() {
while (!thread_.should_stop()) {
auto event = input_->pop();
if (!event.has_value()) {
continue;
}

auto transformed = transform(event.value());
if (transformed.has_value()) {
output_.push(transformed.value());
}
}
}

protected:
std::shared_ptr<Queue<In>> input_;
std::shared_ptr<Queue<Out>> output_;

StoppableThread thread_;
};

template <class T>
using Filter = Transformer<T, T>;

} // namespace collector

#endif
6 changes: 6 additions & 0 deletions collector/lib/StoppableThread.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "StoppableThread.h"

#include <chrono>
#include <iostream>
#include <thread>
#include <unistd.h>

#include "Utility.h"
Expand Down Expand Up @@ -44,6 +46,10 @@ void StoppableThread::Stop() {
}
break;
}
if (!thread_->joinable()) {
CLOG(WARNING) << "thread not yet joinable...";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
thread_->join();
thread_.reset();
int rv = close(stop_pipe_[0]);
Expand Down
72 changes: 72 additions & 0 deletions collector/test/PipelineTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include <chrono>
#include <memory>
#include <optional>
#include <ratio>
#include <thread>
#include <vector>

#include "Pipeline.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace collector {

class IntProducer : public Producer<int> {
public:
IntProducer(std::shared_ptr<Queue<int>>& input, int limit) : Producer(input), limit_(limit) {}

std::optional<int> next() override {
n_++;
if (n_ > limit_) {
return std::nullopt;
}
return {n_};
}

private:
int n_ = 0;
int limit_;
};

class IntConsumer : public Consumer<int> {
public:
IntConsumer(std::shared_ptr<Queue<int>>& input, std::vector<int>& output) : Consumer(input), events_(output) {}

void handle(const int& event) override {
events_.push_back(event);
}

private:
std::vector<int>& events_;
};

class EvenIntFilter : public Filter<int> {
public:
std::optional<int> transform(const int& event) {
if (event % 2 == 0) {
return {event};
}
return std::nullopt;
}
};

TEST(PipelineTests, TestBasic) {
std::shared_ptr<Queue<int>> queue = std::make_shared<Queue<int>>();

std::vector<int> output;

std::unique_ptr<Producer<int>> producer = std::make_unique<IntProducer>(queue, 10);
std::unique_ptr<Consumer<int>> consumer = std::make_unique<IntConsumer>(queue, output);

producer->Start();
consumer->Start();

std::this_thread::sleep_for(std::chrono::milliseconds(200));

consumer->Stop();
producer->Stop();

EXPECT_TRUE(output.size() == 10);
}

} // namespace collector
Loading