Skip to content

Commit

Permalink
adds concurrent queue so writes are thread safe to one file
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveBronder committed Dec 18, 2024
1 parent 814f8df commit e484fa1
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 84 deletions.
23 changes: 21 additions & 2 deletions src/stan/services/pathfinder/multi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ inline void gen_pathfinder_draw(ConstrainFun&& constrain_fun, Model&& model,
}
}
}

template <typename Writer>
struct concurrent_writer {
std::reference_wrapper<Writer> writer;
std::reference_wrapper<std::mutex> mut_;
explicit concurrent_writer(std::mutex& mut, Writer& writer) :
writer(writer), mut_(mut) {}
template <typename T>
void operator()(T&& t) {
std::lock_guard<std::mutex> lock(mut_.get());
writer.get()(t);
}
void operator()() {
std::lock_guard<std::mutex> lock(mut_.get());
writer.get()();
}
};
} // namespace internal

/**
Expand Down Expand Up @@ -201,8 +218,10 @@ inline int pathfinder_lbfgs_multi(
elbo_estimates.push_back(std::move(std::get<1>(pathfinder_ret)));
} else {
// For no psis, have single write to both single and multi writers
stan::callbacks::multi_stream_writer<SingleParamWriter, ParamWriter> multi_param_writer(
single_path_parameter_writer[iter], parameter_writer);
using multi_writer = stan::callbacks::multi_stream_writer<SingleParamWriter, internal::concurrent_writer<ParamWriter>>;
internal::concurrent_writer safe_write{write_mutex, parameter_writer};
multi_writer multi_param_writer(
single_path_parameter_writer[iter], safe_write);
auto pathfinder_ret
= stan::services::pathfinder::pathfinder_lbfgs_single<true>(
model, *(init[iter]), random_seed, stride_id + iter,
Expand Down
4 changes: 2 additions & 2 deletions src/stan/services/pathfinder/single.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,13 +872,13 @@ inline auto pathfinder_lbfgs_single(
error_codes::OK, std::move(est_draws), num_evals + est_draws.fn_calls);
} else {
Eigen::Matrix<double, 1, Eigen::Dynamic> constrained_draws_vec(names.size());
constrained_draws_vec(2) = stride_id;
constrained_draws_vec(2) = stride_id - (ReturnLpSamples ? 1 : 0);
Eigen::Array<double, Eigen::Dynamic, 1> lp_ratio;
auto&& elbo_draws = elbo_best.repeat_draws;
auto&& elbo_lp_ratio = elbo_best.lp_ratio;
auto&& elbo_lp_mat = elbo_best.lp_mat;
const int remaining_draws = num_draws - elbo_lp_ratio.rows();
const Eigen::Index num_unconstrained_params = names.size() - 2;
const Eigen::Index num_unconstrained_params = names.size() - 3;
if (likely(remaining_draws > 0)) {
try {
internal::elbo_est_t est_draws = internal::est_approx_draws<false>(
Expand Down
51 changes: 22 additions & 29 deletions src/test/unit/services/pathfinder/eight_schools_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ TEST_F(ServicesPathfinderEightSchools, multi) {
// bool save_iterations = true;
constexpr int refresh = 1;
constexpr bool save_iterations = false;
constexpr bool calculate_lp = true;
constexpr bool resample = true;
std::unique_ptr<std::ostream> empty_ostream(nullptr);
stan::test::test_logger logger(std::move(empty_ostream));
std::vector<stan::callbacks::writer> single_path_parameter_writer(num_paths);
Expand All @@ -101,24 +103,22 @@ TEST_F(ServicesPathfinderEightSchools, multi) {
save_iterations, refresh, callback, logger,
std::vector<stan::callbacks::stream_writer>(num_paths, init),
single_path_parameter_writer, single_path_diagnostic_writer, parameter,
diagnostics);

Eigen::MatrixXd param_vals = parameter.get_eigen_state_values();
diagnostics, calculate_lp, resample);

Eigen::IOFormat CommaInitFmt(Eigen::StreamPrecision, 0, ", ", ", ", "\n", "",
"", "");

Eigen::RowVectorXd mean_vals = param_vals.colwise().mean().eval()(param_indices);
Eigen::RowVectorXd sd_vals = (((param_vals(Eigen::all, param_indices).rowwise() - mean_vals)
.array()
.square()
.matrix()
.colwise()
.sum()
.array()
/ (param_vals.rows() - 1))
.sqrt()).eval();

Eigen::MatrixXd param_vals = parameter.get_eigen_state_values();
EXPECT_EQ(param_vals.cols(), 21);
EXPECT_EQ(param_vals.rows(), 10000);
// They can be in any order and any number
for (Eigen::Index i = 0; i < num_multi_draws; i++) {
EXPECT_GE(param_vals.col(2)(i), 0);
EXPECT_LE(param_vals.col(2)(i), num_paths - 1);
}
auto param_tmp = param_vals(Eigen::all, param_indices);
auto mean_sd_pair = stan::test::get_mean_sd(param_tmp);
auto&& mean_vals = mean_sd_pair.first;
auto&& sd_vals = mean_sd_pair.second;
Eigen::RowVectorXd r_mean_vals(20);
r_mean_vals << -17.9537, -47.016, 1.89104, 3.66449, 0.22256, 0.119645,
-0.146812, 0.23633, -0.244868, -0.227134, 0.504507, 0.0476979, 3.66491,
Expand Down Expand Up @@ -173,19 +173,13 @@ TEST_F(ServicesPathfinderEightSchools, single) {
"", "");

Eigen::MatrixXd param_vals = parameter.get_eigen_state_values();

Eigen::RowVectorXd mean_vals = param_vals.colwise().mean().eval()(param_indices);
Eigen::RowVectorXd sd_vals = ((((param_vals(Eigen::all,param_indices).rowwise() - mean_vals)
.array()
.square()
.matrix()
.rowwise()
.sum()
.array()
/ (param_vals.rows() - 1))
.sqrt())
.transpose()
.eval());
for (auto&& x_i : param_vals.col(2)) {
EXPECT_EQ(x_i, stride_id);
}
auto param_tmp = param_vals(Eigen::all, param_indices);
auto mean_sd_pair = stan::test::get_mean_sd(param_tmp);
auto&& mean_vals = mean_sd_pair.first;
auto&& sd_vals = mean_sd_pair.second;

Eigen::MatrixXd r_answer = stan::test::eight_schools_r_answer();

Expand Down Expand Up @@ -261,7 +255,6 @@ TEST_F(ServicesPathfinderEightSchools, single) {
all_mean_vals.row(0) = mean_vals;
all_mean_vals.row(1) = mean_r_vals;
all_mean_vals.row(2) = mean_vals - mean_r_vals;

Eigen::MatrixXd all_sd_vals(3, 20);
all_sd_vals.row(0) = sd_vals;
all_sd_vals.row(1) = sd_r_vals;
Expand Down
Loading

0 comments on commit e484fa1

Please sign in to comment.