Skip to content

Commit

Permalink
[NSE-692] JoinBenchmark is broken (oap-project#693)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Jan 12, 2022
1 parent 5fef419 commit c4052de
Showing 1 changed file with 44 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,17 @@ TEST_F(BenchmarkArrowComputeJoin, JoinBenchmark) {
uint64_t num_batches = 0;
uint64_t num_rows = 0;

do {
while (true) {
TIME_MICRO_OR_THROW(elapse_left_read,
left_record_batch_reader->ReadNext(&left_record_batch));
if (left_record_batch) {
TIME_MICRO_OR_THROW(elapse_eval,
expr_build->evaluate(left_record_batch, &dummy_result_batches));
num_batches += 1;
if (!left_record_batch) {
break;
}
} while (left_record_batch);
std::cout << "Readed left table with " << num_batches << " batches." << std::endl;
TIME_MICRO_OR_THROW(elapse_eval,
expr_build->evaluate(left_record_batch, &dummy_result_batches));
num_batches += 1;
}
std::cout << "Read left table with " << num_batches << " batches." << std::endl;

TIME_MICRO_OR_THROW(elapse_finish, expr_build->finish(&build_result_iterator));
TIME_MICRO_OR_THROW(elapse_finish, expr_probe->finish(&probe_result_iterator_base));
Expand All @@ -210,22 +211,23 @@ TEST_F(BenchmarkArrowComputeJoin, JoinBenchmark) {
num_batches = 0;
uint64_t num_output_batches = 0;
std::shared_ptr<arrow::RecordBatch> out;
do {
while (true) {
TIME_MICRO_OR_THROW(elapse_right_read,
right_record_batch_reader->ReadNext(&right_record_batch));
if (right_record_batch) {
std::vector<std::shared_ptr<arrow::Array>> right_column_vector;
for (int i = 0; i < right_record_batch->num_columns(); i++) {
right_column_vector.push_back(right_record_batch->column(i));
}
TIME_MICRO_OR_THROW(elapse_probe_process,
probe_result_iterator->Process(right_column_vector, &out));
num_batches += 1;
num_output_batches++;
num_rows += out->num_rows();
if (!right_record_batch) {
break;
}
std::vector<std::shared_ptr<arrow::Array>> right_column_vector;
for (int i = 0; i < right_record_batch->num_columns(); i++) {
right_column_vector.push_back(right_record_batch->column(i));
}
} while (right_record_batch);
std::cout << "Readed right table with " << num_batches << " batches." << std::endl;
TIME_MICRO_OR_THROW(elapse_probe_process,
probe_result_iterator->Process(right_column_vector, &out));
num_batches += 1;
num_output_batches++;
num_rows += out->num_rows();
}
std::cout << "Read right table with " << num_batches << " batches." << std::endl;

std::cout << "=========================================="
<< "\nBenchmarkArrowComputeJoin processed " << num_batches << " batches"
Expand Down Expand Up @@ -306,16 +308,17 @@ TEST_F(BenchmarkArrowComputeJoin, JoinBenchmarkWithCondition) {
elapse_gen, CreateCodeGenerator(ctx.memory_pool(), left_schema, {probeArrays_expr},
field_list, &expr_probe, true));

do {
while (true) {
TIME_MICRO_OR_THROW(elapse_left_read,
left_record_batch_reader->ReadNext(&left_record_batch));
if (left_record_batch) {
TIME_MICRO_OR_THROW(elapse_eval,
expr_probe->evaluate(left_record_batch, &dummy_result_batches));
num_batches += 1;
if (!left_record_batch) {
break;
}
} while (left_record_batch);
std::cout << "Readed left table with " << num_batches << " batches." << std::endl;
TIME_MICRO_OR_THROW(elapse_eval,
expr_probe->evaluate(left_record_batch, &dummy_result_batches));
num_batches += 1;
}
std::cout << "Read left table with " << num_batches << " batches." << std::endl;

TIME_MICRO_OR_THROW(elapse_finish, expr_probe->finish(&probe_result_iterator_base));
auto probe_result_iterator =
Expand All @@ -325,22 +328,23 @@ TEST_F(BenchmarkArrowComputeJoin, JoinBenchmarkWithCondition) {
num_batches = 0;
uint64_t num_output_batches = 0;
std::shared_ptr<arrow::RecordBatch> out;
do {
while (true) {
TIME_MICRO_OR_THROW(elapse_right_read,
right_record_batch_reader->ReadNext(&right_record_batch));
if (right_record_batch) {
std::vector<std::shared_ptr<arrow::Array>> right_column_vector;
for (int i = 0; i < right_record_batch->num_columns(); i++) {
right_column_vector.push_back(right_record_batch->column(i));
}
TIME_MICRO_OR_THROW(elapse_probe_process,
probe_result_iterator->Process(right_column_vector, &out));
num_batches += 1;
num_output_batches++;
num_rows += out->num_rows();
if (!right_record_batch) {
break;
}
std::vector<std::shared_ptr<arrow::Array>> right_column_vector;
for (int i = 0; i < right_record_batch->num_columns(); i++) {
right_column_vector.push_back(right_record_batch->column(i));
}
} while (right_record_batch);
std::cout << "Readed right table with " << num_batches << " batches." << std::endl;
TIME_MICRO_OR_THROW(elapse_probe_process,
probe_result_iterator->Process(right_column_vector, &out));
num_batches += 1;
num_output_batches++;
num_rows += out->num_rows();
}
std::cout << "Read right table with " << num_batches << " batches." << std::endl;

std::cout << "=========================================="
<< "\nBenchmarkArrowComputeJoin processed " << num_batches << " batches"
Expand Down

0 comments on commit c4052de

Please sign in to comment.