Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] first version of DynamicAlgorithm class + demo algorithm to demonstrate #395

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions examples/dynamic_algorithm_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import desbordante
import pandas

TABLE = 'examples/datasets/pfd.csv'

algo = desbordante.dynamic.algorithms.Default(table=(TABLE, ',', True))

def print_result(algo):
result = algo.get_result()
res = [(int((i.split(',')[0].split(' ')[1])), i) for i in result]
res.sort()
for item in res:
print(' ', item[1])
ins_, del_ = algo.get_result_diff()
if len(ins_):
print(' Added into result:')
for item in ins_:
print(' ', item)
if len(del_):
print(' Removed from result:')
for item in del_:
print(' ', item)

print('Init result:')
print_result(algo)

df1 = pandas.DataFrame({'X': [100, 101], 'Y': [100, 101]})
algo.process(insert=df1)
print('Insert test result:')
print_result(algo)

algo.process(delete=[3, 4, 12])
print('Delete test result:')
print_result(algo)

df2 = pandas.DataFrame({'X': [4], 'Y': [6]})
algo.process(update_old=[6], update_new=df2)
print('Update test result:')
print_result(algo)

df3 = pandas.DataFrame({'X': [100, 100, 102], 'Y': [200, 200, 202]})
df4 = pandas.DataFrame({'X': [102, 105], 'Y': [202, 205]})
algo.process(insert=df3, update_new=df4, delete=[1, 2, 13], update_old=[5, 7])
print('Mixed test result:')
print_result(algo)
4 changes: 4 additions & 0 deletions src/core/algorithms/algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cassert>

#include "config/exceptions.h"
#include "algorithm.h"

namespace algos {

Expand Down Expand Up @@ -146,4 +147,7 @@ std::string_view Algorithm::GetDescription(std::string_view option_name) const {
}
}

bool Algorithm::IsOptionSet(std::string_view option_name) const {
return possible_options_.at(option_name)->IsSet();
}
} // namespace algos
1 change: 1 addition & 0 deletions src/core/algorithms/algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class Algorithm {

[[nodiscard]] std::unordered_set<std::string_view> GetPossibleOptions() const;
[[nodiscard]] std::string_view GetDescription(std::string_view option_name) const;
[[nodiscard]] bool IsOptionSet(std::string_view option_name) const;

std::unordered_map<std::string_view, config::OptValue> GetOptValues() const {
std::unordered_map<std::string_view, config::OptValue> opt_values;
Expand Down
47 changes: 47 additions & 0 deletions src/core/algorithms/dynamic/demo/demo_algo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "demo_algo.h"

#include "config/ioption.h"
#include "config/names.h"

using config::InputTable;
using config::names::kTable;

algos::DynamicAlgorithmDemo::DynamicAlgorithmDemo(std::vector<std::string_view> phase_names)
: DynamicAlgorithm(phase_names) {}

unsigned long long algos::DynamicAlgorithmDemo::ProcessBatch() {
auto start_time = std::chrono::system_clock::now();
added_to_result_.Clear();
erased_from_result_.Clear();
for (size_t row_id : delete_statements_) {
erased_from_result_.Add(result_collection_.Erase({row_id}));
}
for (TableRow row : insert_statements_.AsUnorderedMultiset()) {
table_rows_ids_.emplace(row.getId());
result_collection_.Add(row);
added_to_result_.Add(row);
}
sleep(1);
auto elapsed_milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start_time);
return elapsed_milliseconds.count();
}

void algos::DynamicAlgorithmDemo::LoadDataInternal() {
DynamicAlgorithm::LoadDataInternal();
while (input_table_->HasNextRow()) {
TableRow row{input_table_->GetNextRow()};
table_rows_ids_.emplace(row.getId());
result_collection_.Add(row);
}
sleep(1);
}

std::vector<std::string> algos::DynamicAlgorithmDemo::GetResult() {
return result_collection_.AsStringVector();
}

std::pair<std::vector<std::string>, std::vector<std::string>>
algos::DynamicAlgorithmDemo::GetResultDiff() {
return std::make_pair(added_to_result_.AsStringVector(), erased_from_result_.AsStringVector());
}
33 changes: 33 additions & 0 deletions src/core/algorithms/dynamic/demo/demo_algo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <filesystem>
#include <list>
#include <mutex>

#include "algorithms/dynamic/dynamic_algorithm.h"
#include "config/tabular_data/input_table_type.h"
#include "model/table/table_row.h"
#include "util/dynamic_collection.h"

namespace algos {

class DynamicAlgorithmDemo : public DynamicAlgorithm {
private:
// algorithm result for the last version of data.
RowsContainer result_collection_;

// result diff after last processBatch() call
RowsContainer added_to_result_;
RowsContainer erased_from_result_;

unsigned long long ProcessBatch() final;
void LoadDataInternal() final;

public:
DynamicAlgorithmDemo(std::vector<std::string_view> phase_names = {});

std::vector<std::string> GetResult();
std::pair<std::vector<std::string>, std::vector<std::string>> GetResultDiff();
};

} // namespace algos
118 changes: 118 additions & 0 deletions src/core/algorithms/dynamic/dynamic_algorithm.cpp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like all this class does is bypassing the existing configuration infrastructure.

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#include "dynamic_algorithm.h"

#include <easylogging++.h>

#include "config/names_and_descriptions.h"
#include "config/option_using.h"
#include "config/tabular_data/crud_operations/operations.h"
#include "config/tabular_data/input_table/option.h"
#include "config/tabular_data/input_table_type.h"


void algos::DynamicAlgorithm::RegisterOptions() {
RegisterOption(config::kTableOpt(&input_table_));
RegisterOption(config::kInsertStatementsOpt(&insert_batch_));
RegisterOption(config::kDeleteStatementsOpt(&delete_batch_));
RegisterOption(config::kUpdateOldStatementsOpt(&update_old_batch_));
RegisterOption(config::kUpdateNewStatementsOpt(&update_new_batch_));
}

algos::DynamicAlgorithm::DynamicAlgorithm(std::vector<std::string_view> phase_names)
: Algorithm(phase_names) {
RegisterOptions();
MakeOptionsAvailable({config::names::kTable});
}

void algos::DynamicAlgorithm::LoadDataInternal() {
if (input_table_->GetNumberOfColumns() == 0) {
throw std::runtime_error("Unable to work on an empty dataset.");
}
is_initialized_ = true;
}

void algos::DynamicAlgorithm::ResetState() {
insert_statements_.Clear();
delete_statements_.clear();
Comment on lines +34 to +35
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be achieved if we just provide empty values as defaults for these options. The user has to set all options before executing the algorithm, so they would inevitably become empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields will not be empty, because they are not options

Copy link
Collaborator

@BUYT-1 BUYT-1 May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay. They store the inserted and deleted lines, right? Isn't it wasteful to store them entirely in memory, though? I think these things should be managed by the algorithm.

}

void algos::DynamicAlgorithm::MakeExecuteOptsAvailable() {
if (is_initialized_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check does nothing, existing configuration only calls this method after LoadData has been called.

MakeOptionsAvailable(kCrudOptions);
}
}

void algos::DynamicAlgorithm::AddSpecificNeededOptions(
std::unordered_set<std::string_view>& previous_options) const {
for (const std::string_view& option_name : kCrudOptions) {
previous_options.erase(option_name);
}
Comment on lines +46 to +48
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes these options invisible in a GetNeededOptions call, but they should be visible.

}

void algos::DynamicAlgorithm::ValidateInsertStatements(InputTable& data) {
if (!data) {
return;
}
if (data->GetNumberOfColumns() != input_table_->GetNumberOfColumns()) {
throw config::ConfigurationError(
"Invalid data received: the number of columns in the \
modification statements is different from the table.");
}
Comment on lines +55 to +59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Option class allows validation and normalization of values, default values, and it is also possible to have options that are only shown if some condition, which is expected to depend on other option values, holds. The basic checks can be done with those methods. MetricVerifier uses all the Option's methods, so you can look there for an example. But option management shouldn't be in the base class anyway. In particular, some algorithms may not support deletion, yet the option would be visible here.

while (data->HasNextRow()) {
TableRow row{data->GetNextRow()};
if (row.getData().size() != input_table_->GetNumberOfColumns()) {
LOG(WARNING) << "Unexpected number of columns for a row, skipping (expected "
<< input_table_->GetNumberOfColumns() << ", got " << row.getData().size()
<< ")";
continue;
}
insert_statements_.Add(row);
table_rows_ids_.insert(row.getId());
}
}

void algos::DynamicAlgorithm::ValidateDeleteStatements(std::vector<size_t>& data) {
for (size_t row_id : data) {
if (!table_rows_ids_.count(row_id)) {
throw config::ConfigurationError("Invalid data received: the row with " +
std::to_string(row_id) +
" does not exist in the table.");
} else {
table_rows_ids_.erase(row_id);
delete_statements_.emplace_back(row_id);
}
}
data.clear();
}

void algos::DynamicAlgorithm::ConfigureBatch() {
// configure update statements
ValidateDeleteStatements(update_old_batch_);
ValidateInsertStatements(update_new_batch_);
if (insert_statements_.Size() != delete_statements_.size()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make more sense to have an object that yields (id, new_data) pairs instead as a single update option in a manner similar to IDatasetStream or just a vector of pairs. This option is one of the cases where checking its value (for example, checking that all the rows have the correct length) completely might not be feasible.

throw config::ConfigurationError(
"Invalid data received: number of rows to update: " +
std::to_string(insert_statements_.Size()) +
", number of rows to update with: " + std::to_string(delete_statements_.size()));
}
// configure insert statements
ValidateInsertStatements(insert_batch_);
// configure delete statements
ValidateDeleteStatements(delete_batch_);
}

bool algos::DynamicAlgorithm::HasBatch() {
bool result = false;
for (const std::string_view& option_name : kCrudOptions) {
result |= IsOptionSet(option_name);
}
return result;
}
Comment on lines +103 to +109
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already taken care of by the configuration system. Default values can be set by Option to be empty, and we'd just have no rows/IDs/pairs in the relevant field.


unsigned long long algos::DynamicAlgorithm::ExecuteInternal() {
if (HasBatch()) {
ConfigureBatch();
unsigned long long time_ms = ProcessBatch();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessBatch should just be ExecuteInternal for every algorithm (unless there are a lot of common things happening, but that's for the future).

return time_ms;
}
return 0;
}
59 changes: 59 additions & 0 deletions src/core/algorithms/dynamic/dynamic_algorithm.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include <filesystem>
#include <list>
#include <mutex>

#include "algorithms/algorithm.h"
#include "config/tabular_data/input_table_type.h"
#include "model/table/table_row.h"
#include "util/dynamic_collection.h"

using RowsContainer = util::DynamicCollection<TableRow>;

namespace algos {
using config::InputTable;

class DynamicAlgorithm : public Algorithm {
public:
explicit DynamicAlgorithm(std::vector<std::string_view> phase_names);

protected:
// algo doing its stuff
virtual unsigned long long ProcessBatch() = 0;
void LoadDataInternal() override;
unsigned long long ExecuteInternal() final;
void MakeExecuteOptsAvailable() override;

void ResetState() override;

// modify statements in last batch of changes
RowsContainer insert_statements_;
std::vector<size_t> delete_statements_{};

// init state of table
InputTable input_table_;
// insert operations stream
InputTable insert_batch_;
// delete operations stream
std::vector<size_t> delete_batch_{};
// update operations stream (old values)
std::vector<size_t> update_old_batch_{};
// update operations stream (new values)
InputTable update_new_batch_;
// stores current table rows ids (after last Execute() method call)
std::unordered_set<size_t> table_rows_ids_{};

private:
void AddSpecificNeededOptions(
std::unordered_set<std::string_view>& previous_options) const override;
void ValidateInsertStatements(InputTable& data);
void ValidateDeleteStatements(std::vector<size_t>& data);
void ConfigureBatch();
void RegisterOptions();
bool HasBatch();

bool is_initialized_ = false;
};

} // namespace algos
6 changes: 6 additions & 0 deletions src/core/config/descriptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ constexpr auto kDIgnoreConstantCols =
constexpr auto kDGraphData = "Path to dot-file with graph";
constexpr auto kDGfdData = "Path to file with GFD";
constexpr auto kDMemLimitMB = "memory limit im MBs";
constexpr auto kDInsertStatements = "Rows to be inserted into the table using the insert operation";
constexpr auto kDDeleteStatements = "Rows to be deleted from the table using the delete operation";
constexpr auto kDUpdateOldStatements =
"Rows that need to be replaced in the table using the update operation";
constexpr auto kDUpdateNewStatements =
"Rows that need to be used to replace old data in the table using the update operation";
constexpr auto kDDifferenceTable = "CSV table containing difference limits for each column";
constexpr auto kDNumRows = "Use only first N rows of the table";
constexpr auto kDNUmColumns = "Use only first N columns of the table";
Expand Down
4 changes: 4 additions & 0 deletions src/core/config/names.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ constexpr auto kIgnoreConstantCols = "ignore_constant_cols";
constexpr auto kGraphData = "graph";
constexpr auto kGfdData = "gfd";
constexpr auto kMemLimitMB = "mem_limit";
constexpr auto kInsertStatements = "insert";
constexpr auto kDeleteStatements = "delete";
constexpr auto kUpdateOldStatements = "update_old";
constexpr auto kUpdateNewStatements = "update_new";
constexpr auto kDifferenceTable = "difference_table";
constexpr auto kNumRows = "num_rows";
constexpr auto kNumColumns = "num_columns";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "config/tabular_data/crud_operations/delete/option.h"

#include "config/names_and_descriptions.h"

namespace config {
using names::kDeleteStatements, descriptions::kDDeleteStatements;
extern CommonOption<std::vector<size_t>> const kDeleteStatementsOpt = {
kDeleteStatements, kDDeleteStatements, std::nullopt, nullptr, nullptr};
} // namespace config
8 changes: 8 additions & 0 deletions src/core/config/tabular_data/crud_operations/delete/option.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

#include "config/common_option.h"
#include "config/tabular_data/input_table_type.h"

namespace config {
extern CommonOption<std::vector<size_t>> const kDeleteStatementsOpt;
} // namespace config
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "config/tabular_data/crud_operations/insert/option.h"

#include "config/names_and_descriptions.h"

namespace config {
using names::kInsertStatements, descriptions::kDInsertStatements;
extern CommonOption<InputTable> const kInsertStatementsOpt = {kInsertStatements, kDInsertStatements,
std::nullopt, nullptr, nullptr};
} // namespace config
8 changes: 8 additions & 0 deletions src/core/config/tabular_data/crud_operations/insert/option.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

#include "config/common_option.h"
#include "config/tabular_data/input_table_type.h"

namespace config {
extern CommonOption<InputTable> const kInsertStatementsOpt;
} // namespace config
Loading
Loading