Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming concurrency tests #5437

Merged
merged 6 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/io/sparse_bin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ class SparseBin : public Bin {
void Push(int tid, data_size_t idx, uint32_t value) override {
auto cur_bin = static_cast<VAL_T>(value);
if (cur_bin != 0) {
push_buffers_[tid].emplace_back(idx, cur_bin);
// Splitting this operation into separate calls avoids a concurrency exception. Do not simplify.
auto buffer = push_buffers_[tid];
svotaw marked this conversation as resolved.
Show resolved Hide resolved
buffer.emplace_back(idx, cur_bin);
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/cpp_tests/test_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ void test_stream_sparse(
EXPECT_EQ(0, result) << "LGBM_DatasetCreateFromSampledColumn result code: " << result;

dataset = static_cast<Dataset*>(dataset_handle);
dataset->InitStreaming(nrows, has_weights, has_init_scores, has_queries, nclasses, 1);
dataset->InitStreaming(nrows, has_weights, has_init_scores, has_queries, nclasses, 2);
break;
}

Expand All @@ -198,6 +198,7 @@ void test_stream_sparse(

dataset = static_cast<Dataset*>(dataset_handle);

Log::Info("Streaming sparse dataset, %d rows sparse data with a batch size of %d", nrows, batch_count);
TestUtils::StreamSparseDataset(
dataset_handle,
nrows,
Expand All @@ -213,7 +214,6 @@ void test_stream_sparse(

dataset->FinishLoad();

Log::Info("Streaming sparse dataset, %d rows sparse data with a batch size of %d", nrows, batch_count);
TestUtils::AssertMetadata(&dataset->metadata(),
labels,
weights,
Expand Down Expand Up @@ -320,7 +320,7 @@ TEST(Stream, PushSparseRowsWithMetadata) {
TestUtils::CreateRandomSparseData(nrows, ncols, nclasses, sparse_percent, &indptr, &indices, &vals, &labels, &weights, &init_scores, &groups);

const std::vector<int32_t> batch_counts = { 1, nrows / 100, nrows / 10, nrows };
const std::vector<int8_t> creation_types = { 0, 1 };
const std::vector<int8_t> creation_types = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svotaw
Why there are only zeros here now and no any ones?
Previously creation_types = { 0, 1 }; made sense to me, but now I'm very confused by such test parametrization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StrikerRUS oops, yes need to revert this. It was a quick and dirty experiment just to force many repeats. Some of the early testing failures were only sporadic, so I added this to give me better confidence on no failures. I will fix.

I had planned to do a last pass over the PR to look for things like this, but it got checked in. Not used to someone else being in control of the checkin. :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, thanks a lot for your awesome work on LightGBM!

Luckily, we need to revert just one line of the code. 🙂


for (size_t i = 0; i < creation_types.size(); ++i) { // from sampled data or reference
for (size_t j = 0; j < batch_counts.size(); ++j) {
Expand Down
134 changes: 97 additions & 37 deletions tests/cpp_tests/testutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include <gtest/gtest.h>
#include <string>
#include <thread>
#include <utility>

using LightGBM::Log;
using LightGBM::Random;
Expand Down Expand Up @@ -206,16 +208,16 @@ namespace LightGBM {
}

void TestUtils::StreamSparseDataset(DatasetHandle dataset_handle,
int32_t nrows,
int32_t nclasses,
int32_t batch_count,
const std::vector<int32_t>* indptr,
const std::vector<int32_t>* indices,
const std::vector<double>* values,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups) {
int32_t nrows,
int32_t nclasses,
int32_t batch_count,
const std::vector<int32_t>* indptr,
const std::vector<int32_t>* indices,
const std::vector<double>* values,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups) {
int result = LGBM_DatasetSetWaitForManualFinish(dataset_handle, 1);
EXPECT_EQ(0, result) << "LGBM_DatasetSetWaitForManualFinish result code: " << result;

Expand All @@ -233,42 +235,102 @@ namespace LightGBM {
weights_ptr = weights->data();
}

// Since init_scores are in a column format, but need to be pushed as rows, we have to extract each batch
std::vector<double> init_score_batch;
const double* init_scores_ptr = nullptr;
if (init_scores) {
init_score_batch.reserve(nclasses * batch_count);
init_scores_ptr = init_score_batch.data();
}

const int32_t* groups_ptr = nullptr;
if (groups) {
groups_ptr = groups->data();
}

auto start_time = std::chrono::steady_clock::now();

for (int32_t i = 0; i < nrows; i += batch_count) {
// Use multiple threads to test concurrency
int thread_count = 2;
if (nrows == batch_count) {
thread_count = 1; // If pushing all rows in 1 batch, we cannot have multiple threads
}
std::vector<std::thread> threads;
threads.reserve(thread_count);
for (int32_t t = 0; t < thread_count; ++t) {
std::thread th(TestUtils::PushSparseBatch,
dataset_handle,
nrows,
nclasses,
batch_count,
indptr,
indptr_ptr,
indices_ptr,
values_ptr,
labels_ptr,
weights_ptr,
init_scores,
groups_ptr,
thread_count,
t);
threads.push_back(move(th));
}

for (auto& t : threads) t.join();

auto cur_time = std::chrono::steady_clock::now();
Log::Info(" Time: %d", cur_time - start_time);
}

/*!
* Pushes data from 1 thread into a Dataset based on thread_id and nrows.
* e.g. with 100 rows, thread 0 will push rows 0-49, and thread 2 will push rows 50-99.
* Note that rows are still pushed in microbatches within their range.
*/
void TestUtils::PushSparseBatch(DatasetHandle dataset_handle,
int32_t nrows,
int32_t nclasses,
int32_t batch_count,
const std::vector<int32_t>* indptr,
const int32_t* indptr_ptr,
const int32_t* indices_ptr,
const double* values_ptr,
const float* labels_ptr,
const float* weights_ptr,
const std::vector<double>* init_scores,
const int32_t* groups_ptr,
int32_t thread_count,
int32_t thread_id) {
int32_t threadChunkSize = nrows / thread_count;
int32_t startIndex = threadChunkSize * thread_id;
int32_t stopIndex = startIndex + threadChunkSize;

indptr_ptr += threadChunkSize * thread_id;
labels_ptr += threadChunkSize * thread_id;
if (weights_ptr) {
weights_ptr += threadChunkSize * thread_id;
}
if (groups_ptr) {
groups_ptr += threadChunkSize * thread_id;
}

for (int32_t i = startIndex; i < stopIndex; i += batch_count) {
// Since init_scores are in a column format, but need to be pushed as rows, we have to extract each batch
std::vector<double> init_score_batch;
const double* init_scores_ptr = nullptr;
if (init_scores) {
init_score_batch.reserve(nclasses * batch_count);
init_scores_ptr = CreateInitScoreBatch(&init_score_batch, i, nrows, nclasses, batch_count, init_scores);
}

int32_t nelem = indptr->at(i + batch_count - 1) - indptr->at(i);

result = LGBM_DatasetPushRowsByCSRWithMetadata(dataset_handle,
indptr_ptr,
2,
indices_ptr,
values_ptr,
1,
batch_count + 1,
nelem,
i,
labels_ptr,
weights_ptr,
init_scores_ptr,
groups_ptr,
0);
int result = LGBM_DatasetPushRowsByCSRWithMetadata(dataset_handle,
indptr_ptr,
2,
indices_ptr,
values_ptr,
1,
batch_count + 1,
nelem,
i,
labels_ptr,
weights_ptr,
init_scores_ptr,
groups_ptr,
thread_id);
EXPECT_EQ(0, result) << "LGBM_DatasetPushRowsByCSRWithMetadata result code: " << result;
if (result != 0) {
FAIL() << "LGBM_DatasetPushRowsByCSRWithMetadata failed"; // This forces an immediate failure, which EXPECT_EQ does not
Expand All @@ -283,11 +345,9 @@ namespace LightGBM {
groups_ptr += batch_count;
}
}

auto cur_time = std::chrono::steady_clock::now();
Log::Info(" Time: %d", cur_time - start_time);
}


void TestUtils::AssertMetadata(const Metadata* metadata,
const std::vector<float>* ref_labels,
const std::vector<float>* ref_weights,
Expand All @@ -296,7 +356,7 @@ namespace LightGBM {
const float* labels = metadata->label();
auto nTotal = static_cast<int32_t>(ref_labels->size());
for (auto i = 0; i < nTotal; i++) {
EXPECT_EQ(ref_labels->at(i), labels[i]) << "Inserted data: " << ref_labels->at(i);
EXPECT_EQ(ref_labels->at(i), labels[i]) << "Inserted data: " << ref_labels->at(i) << " at " << i;
if (ref_labels->at(i) != labels[i]) {
FAIL() << "Mismatched labels"; // This forces an immediate failure, which EXPECT_EQ does not
}
Expand Down
Loading