Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-429] TPC-DS Q14a/b get slowed down within setting spark.oap.sql.columnar.sortmergejoin.lazyread=true #432

Merged
merged 15 commits into from
Aug 4, 2021
89 changes: 29 additions & 60 deletions native-sql-engine/cpp/src/codegen/common/relation_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ class RelationColumn {
int field_id) {
return arrow::Status::NotImplemented("RelationColumn AppendColumn is abstract.");
};
virtual arrow::Status AdvanceTo(int array_id) {
return arrow::Status::NotImplemented("RelationColumn Advance is abstract.");
};
virtual void AdvanceTo(int array_id) { return; };
virtual int32_t Advance(int32_t array_offset) { return -1; };
virtual arrow::Status ReleaseArray(int array_id) = 0;
virtual arrow::Status GetArrayVector(std::vector<std::shared_ptr<arrow::Array>>* out) {
Expand Down Expand Up @@ -147,10 +145,8 @@ class TypedRelationColumn<DataType, enable_if_number_or_decimal<DataType>>
}
return arrow::Status::OK();
}
virtual T GetValue(int array_id, int id) {
return array_vector_[array_id]->GetView(id);
}
bool HasNull() { return has_null_; }
T GetValue(int array_id, int id) { return array_vector_[array_id]->GetView(id); }
virtual bool HasNull() { return has_null_; }

private:
using ArrayType = typename TypeTraits<DataType>::ArrayType;
Expand Down Expand Up @@ -190,10 +186,10 @@ class TypedRelationColumn<DataType, enable_if_string_like<DataType>>
}
return arrow::Status::OK();
}
virtual std::string GetValue(int array_id, int id) {
std::string GetValue(int array_id, int id) {
return array_vector_[array_id]->GetString(id);
}
bool HasNull() { return has_null_; }
virtual bool HasNull() { return has_null_; }

private:
std::vector<std::shared_ptr<StringArray>> array_vector_;
Expand All @@ -213,37 +209,32 @@ class TypedLazyLoadRelationColumn<DataType, enable_if_number_or_decimal<DataType
using T = typename TypeTraits<DataType>::CType;
TypedLazyLoadRelationColumn() = default;

bool IsNull(int array_id, int id) override {
AdvanceTo(array_id);
return delegated.IsNull(array_id, id);
}

bool IsEqualTo(int x_array_id, int x_id, int y_array_id, int y_id) override {
AdvanceTo(x_array_id);
AdvanceTo(y_array_id);
return delegated.IsEqualTo(x_array_id, x_id, y_array_id, y_id);
}

arrow::Status FromLazyBatchIterator(std::shared_ptr<LazyBatchIterator> in,
int field_id) override {
in_ = in;
field_id_ = field_id;
AdvanceTo(0);
has_null_ = TypedRelationColumn<DataType>::HasNull();
return arrow::Status::OK();
};

arrow::Status AdvanceTo(int array_id) override {
void AdvanceTo(int array_id) override {
if (array_id < current_array_id_) {
return arrow::Status::OK();
return;
}
for (int i = current_array_id_; i <= array_id; i++) {
std::shared_ptr<arrow::RecordBatch> batch = in_->GetBatch(i);
if (batch == nullptr) {
current_array_id_ = i;
return;
}
std::shared_ptr<arrow::Array> array = batch->column(field_id_);
delegated.AppendColumn(array);
TypedRelationColumn<DataType>::AppendColumn(array);
in_->RetainBatch(i);
array_released.push_back(false);
}
current_array_id_ = array_id + 1;
return arrow::Status::OK();
return;
}

// return actual advanced array count
Expand All @@ -255,7 +246,7 @@ class TypedLazyLoadRelationColumn<DataType, enable_if_number_or_decimal<DataType
return i;
}
std::shared_ptr<arrow::Array> array = batch->column(field_id_);
delegated.AppendColumn(array);
TypedRelationColumn<DataType>::AppendColumn(array);
in_->RetainBatch(target_batch);
array_released.push_back(false);
current_array_id_++;
Expand All @@ -270,25 +261,16 @@ class TypedLazyLoadRelationColumn<DataType, enable_if_number_or_decimal<DataType
if (array_released[array_id]) {
return arrow::Status::OK();
}
RETURN_NOT_OK(delegated.ReleaseArray(array_id));
RETURN_NOT_OK(TypedRelationColumn<DataType>::ReleaseArray(array_id));
in_->ReleaseBatch(array_id);
array_released[array_id] = true;
return arrow::Status::OK();
}

arrow::Status GetArrayVector(std::vector<std::shared_ptr<arrow::Array>>* out) override {
return delegated.GetArrayVector(out);
}

T GetValue(int array_id, int id) override {
AdvanceTo(array_id);
return delegated.GetValue(array_id, id);
}
bool HasNull() override { return has_null_; }

private:
std::shared_ptr<LazyBatchIterator> in_;
TypedRelationColumn<DataType> delegated;
int current_array_id_ = 0;
int field_id_ = -1;
std::vector<bool> array_released;
Expand All @@ -300,36 +282,33 @@ class TypedLazyLoadRelationColumn<DataType, enable_if_string_like<DataType>>
: public TypedRelationColumn<DataType> {
public:
TypedLazyLoadRelationColumn() = default;
bool IsNull(int array_id, int id) override {
AdvanceTo(array_id);
return delegated.IsNull(array_id, id);
}
bool IsEqualTo(int x_array_id, int x_id, int y_array_id, int y_id) override {
AdvanceTo(x_array_id);
AdvanceTo(y_array_id);
return delegated.IsEqualTo(x_array_id, x_id, y_array_id, y_id);
}

arrow::Status FromLazyBatchIterator(std::shared_ptr<LazyBatchIterator> in,
int field_id) override {
in_ = in;
field_id_ = field_id;
AdvanceTo(0);
has_null_ = TypedRelationColumn<DataType>::HasNull();
return arrow::Status::OK();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be able to apply below patch to optimize the null check

diff --git a/native-sql-engine/cpp/src/codegen/common/relation_column.h b/native-sql-engine/cpp/src/codegen/common/relation_column.h
index 33bf151d..66e78601 100644
--- a/native-sql-engine/cpp/src/codegen/common/relation_column.h
+++ b/native-sql-engine/cpp/src/codegen/common/relation_column.h
@@ -216,6 +216,8 @@ class TypedLazyLoadRelationColumn<DataType, enable_if_number_or_decimal<DataType
     in_ = in;
     field_id_ = field_id;
     return arrow::Status::OK();
+    AdvanceTo(0);
+    has_null_ = TypedRelationColumn<DataType>::HasNull();
   };

   void AdvanceTo(int array_id) override {

};

arrow::Status AdvanceTo(int array_id) override {
void AdvanceTo(int array_id) override {
if (array_id < current_array_id_) {
return arrow::Status::OK();
return;
}
for (int i = current_array_id_; i <= array_id; i++) {
std::shared_ptr<arrow::RecordBatch> batch = in_->GetBatch(i);
if (batch == nullptr) {
current_array_id_ = i;
return;
}
std::shared_ptr<arrow::Array> array = batch->column(field_id_);
delegated.AppendColumn(array);
TypedRelationColumn<DataType>::AppendColumn(array);
in_->RetainBatch(i);
array_released.push_back(false);
}
current_array_id_ = array_id + 1;
return arrow::Status::OK();
return;
}

// return actual advanced array count
Expand All @@ -341,7 +320,7 @@ class TypedLazyLoadRelationColumn<DataType, enable_if_string_like<DataType>>
return i;
}
std::shared_ptr<arrow::Array> array = batch->column(field_id_);
delegated.AppendColumn(array);
TypedRelationColumn<DataType>::AppendColumn(array);
in_->RetainBatch(target_batch);
array_released.push_back(false);
current_array_id_++;
Expand All @@ -356,26 +335,16 @@ class TypedLazyLoadRelationColumn<DataType, enable_if_string_like<DataType>>
if (array_released[array_id]) {
return arrow::Status::OK();
}
RETURN_NOT_OK(delegated.ReleaseArray(array_id));
RETURN_NOT_OK(TypedRelationColumn<DataType>::ReleaseArray(array_id));
in_->ReleaseBatch(array_id);
array_released[array_id] = true;
return arrow::Status::OK();
}

arrow::Status GetArrayVector(std::vector<std::shared_ptr<arrow::Array>>* out) override {
return delegated.GetArrayVector(out);
}

std::string GetValue(int array_id, int id) override {
AdvanceTo(array_id);
return delegated.GetValue(array_id, id);
}

bool HasNull() override { return has_null_; }

private:
std::shared_ptr<LazyBatchIterator> in_;
TypedRelationColumn<DataType> delegated;
int32_t current_array_id_ = 0;
int32_t field_id_ = -1;
std::vector<bool> array_released;
Expand Down
Loading