diff --git a/native-sql-engine/cpp/src/codegen/common/relation_column.h b/native-sql-engine/cpp/src/codegen/common/relation_column.h index a2fde5a5b..f2518bc5d 100644 --- a/native-sql-engine/cpp/src/codegen/common/relation_column.h +++ b/native-sql-engine/cpp/src/codegen/common/relation_column.h @@ -100,9 +100,7 @@ class RelationColumn { int field_id) { return arrow::Status::NotImplemented("RelationColumn AppendColumn is abstract."); }; - virtual arrow::Status AdvanceTo(int array_id) { - return arrow::Status::NotImplemented("RelationColumn Advance is abstract."); - }; + virtual void AdvanceTo(int array_id) { return; }; virtual int32_t Advance(int32_t array_offset) { return -1; }; virtual arrow::Status ReleaseArray(int array_id) = 0; virtual arrow::Status GetArrayVector(std::vector>* out) { @@ -147,10 +145,8 @@ class TypedRelationColumn> } return arrow::Status::OK(); } - virtual T GetValue(int array_id, int id) { - return array_vector_[array_id]->GetView(id); - } - bool HasNull() { return has_null_; } + T GetValue(int array_id, int id) { return array_vector_[array_id]->GetView(id); } + virtual bool HasNull() { return has_null_; } private: using ArrayType = typename TypeTraits::ArrayType; @@ -190,10 +186,10 @@ class TypedRelationColumn> } return arrow::Status::OK(); } - virtual std::string GetValue(int array_id, int id) { + std::string GetValue(int array_id, int id) { return array_vector_[array_id]->GetString(id); } - bool HasNull() { return has_null_; } + virtual bool HasNull() { return has_null_; } private: std::vector> array_vector_; @@ -213,37 +209,32 @@ class TypedLazyLoadRelationColumn::CType; TypedLazyLoadRelationColumn() = default; - bool IsNull(int array_id, int id) override { - AdvanceTo(array_id); - return delegated.IsNull(array_id, id); - } - - bool IsEqualTo(int x_array_id, int x_id, int y_array_id, int y_id) override { - AdvanceTo(x_array_id); - AdvanceTo(y_array_id); - return delegated.IsEqualTo(x_array_id, x_id, y_array_id, y_id); - } - arrow::Status FromLazyBatchIterator(std::shared_ptr in, int field_id) override { in_ = in; field_id_ = field_id; + AdvanceTo(0); + has_null_ = TypedRelationColumn::HasNull(); return arrow::Status::OK(); }; - arrow::Status AdvanceTo(int array_id) override { + void AdvanceTo(int array_id) override { if (array_id < current_array_id_) { - return arrow::Status::OK(); + return; } for (int i = current_array_id_; i <= array_id; i++) { std::shared_ptr batch = in_->GetBatch(i); + if (batch == nullptr) { + current_array_id_ = i; + return; + } std::shared_ptr array = batch->column(field_id_); - delegated.AppendColumn(array); + TypedRelationColumn::AppendColumn(array); in_->RetainBatch(i); array_released.push_back(false); } current_array_id_ = array_id + 1; - return arrow::Status::OK(); + return; } // return actual advanced array count @@ -255,7 +246,7 @@ class TypedLazyLoadRelationColumn array = batch->column(field_id_); - delegated.AppendColumn(array); + TypedRelationColumn::AppendColumn(array); in_->RetainBatch(target_batch); array_released.push_back(false); current_array_id_++; @@ -270,25 +261,16 @@ class TypedLazyLoadRelationColumn::ReleaseArray(array_id)); in_->ReleaseBatch(array_id); array_released[array_id] = true; return arrow::Status::OK(); } - arrow::Status GetArrayVector(std::vector>* out) override { - return delegated.GetArrayVector(out); - } - - T GetValue(int array_id, int id) override { - AdvanceTo(array_id); - return delegated.GetValue(array_id, id); - } bool HasNull() override { return has_null_; } private: std::shared_ptr in_; - TypedRelationColumn delegated; int current_array_id_ = 0; int field_id_ = -1; std::vector array_released; @@ -300,36 +282,33 @@ class TypedLazyLoadRelationColumn> : public TypedRelationColumn { public: TypedLazyLoadRelationColumn() = default; - bool IsNull(int array_id, int id) override { - AdvanceTo(array_id); - return delegated.IsNull(array_id, id); - } - bool IsEqualTo(int x_array_id, int x_id, int y_array_id, int y_id) override { - AdvanceTo(x_array_id); - AdvanceTo(y_array_id); - return delegated.IsEqualTo(x_array_id, x_id, y_array_id, y_id); - } arrow::Status FromLazyBatchIterator(std::shared_ptr in, int field_id) override { in_ = in; field_id_ = field_id; + AdvanceTo(0); + has_null_ = TypedRelationColumn::HasNull(); return arrow::Status::OK(); }; - arrow::Status AdvanceTo(int array_id) override { + void AdvanceTo(int array_id) override { if (array_id < current_array_id_) { - return arrow::Status::OK(); + return; } for (int i = current_array_id_; i <= array_id; i++) { std::shared_ptr batch = in_->GetBatch(i); + if (batch == nullptr) { + current_array_id_ = i; + return; + } std::shared_ptr array = batch->column(field_id_); - delegated.AppendColumn(array); + TypedRelationColumn::AppendColumn(array); in_->RetainBatch(i); array_released.push_back(false); } current_array_id_ = array_id + 1; - return arrow::Status::OK(); + return; } // return actual advanced array count @@ -341,7 +320,7 @@ class TypedLazyLoadRelationColumn> return i; } std::shared_ptr array = batch->column(field_id_); - delegated.AppendColumn(array); + TypedRelationColumn::AppendColumn(array); in_->RetainBatch(target_batch); array_released.push_back(false); current_array_id_++; @@ -356,26 +335,16 @@ class TypedLazyLoadRelationColumn> if (array_released[array_id]) { return arrow::Status::OK(); } - RETURN_NOT_OK(delegated.ReleaseArray(array_id)); + RETURN_NOT_OK(TypedRelationColumn::ReleaseArray(array_id)); in_->ReleaseBatch(array_id); array_released[array_id] = true; return arrow::Status::OK(); } - arrow::Status GetArrayVector(std::vector>* out) override { - return delegated.GetArrayVector(out); - } - - std::string GetValue(int array_id, int id) override { - AdvanceTo(array_id); - return delegated.GetValue(array_id, id); - } - bool HasNull() override { return has_null_; } private: std::shared_ptr in_; - TypedRelationColumn delegated; int32_t current_array_id_ = 0; int32_t field_id_ = -1; std::vector array_released; diff --git a/native-sql-engine/cpp/src/codegen/common/sort_relation.h b/native-sql-engine/cpp/src/codegen/common/sort_relation.h index dc7e95355..33a921f13 100644 --- a/native-sql-engine/cpp/src/codegen/common/sort_relation.h +++ b/native-sql-engine/cpp/src/codegen/common/sort_relation.h @@ -64,6 +64,8 @@ class SortRelation { } array_id++; } + } else { + ArrayAdvanceTo(0); } } @@ -89,9 +91,6 @@ class SortRelation { } void ArrayRelease(int array_id) { - for (auto col : sort_relation_key_list_) { - col->ReleaseArray(array_id); - } for (auto col : sort_relation_payload_list_) { col->ReleaseArray(array_id); } @@ -113,31 +112,31 @@ class SortRelation { } void ArrayAdvanceTo(int array_id) { - for (auto col : sort_relation_key_list_) { - col->Advance(array_id); + if (array_id <= fetched_batches_) { + return; } + int32_t fetching = (array_id / 16 + 1) * 16; for (auto col : sort_relation_payload_list_) { - col->Advance(array_id); + col->AdvanceTo(fetching); } + fetched_batches_ = fetching; } void Advance(int shift) { - int64_t batch_length = lazy_in_->GetNumRowsOfBatch(requested_batches); + int64_t batch_length = lazy_in_->GetNumRowsOfBatch(requested_batches_); int64_t batch_remaining = (batch_length - 1) - offset_in_current_batch_; if (shift <= batch_remaining) { offset_in_current_batch_ = offset_in_current_batch_ + shift; return; } int64_t remaining = shift - batch_remaining; - int32_t batch_i = requested_batches + 1; + int32_t batch_i = requested_batches_ + 1; while (true) { int64_t current_batch_length = lazy_in_->GetNumRowsOfBatch(batch_i); if (remaining <= current_batch_length) { - requested_batches = batch_i; - ArrayAdvanceTo(requested_batches); - for (int32_t i = 0; i < requested_batches; i++) { - ArrayRelease(i); - } + requested_batches_ = batch_i; + ArrayAdvanceTo(requested_batches_); + ReleaseAllRead(); offset_in_current_batch_ = remaining - 1; return; } @@ -146,22 +145,59 @@ class SortRelation { } } + void ReleaseAllRead() { + if (requested_batches_ > released_batches_ + 16) { + return; + } + for (int32_t i = released_batches_ + 1; i < requested_batches_; i++) { + ArrayRelease(i); + released_batches_ = i; + } + } + ArrayItemIndexS GetItemIndexWithShift(int shift) { + // std::cout << "DEBUG -> GetItemIndexWithShift: " << shift << std::endl; if (!is_lazy_input_) { return indices_begin_[offset_ + shift]; } - int64_t batch_length = lazy_in_->GetNumRowsOfBatch(requested_batches); - int64_t batch_remaining = (batch_length - 1) - offset_in_current_batch_; - if (shift <= batch_remaining) { - ArrayItemIndexS s(requested_batches, offset_in_current_batch_ + shift); + int64_t batch_length_0; + int64_t batch_remaining_0; + int64_t offset_in_current_batch_0; + int32_t shift_0; + int32_t requested_batches_0; + + if (shift == last_shifted_) { + ArrayItemIndexS s(shift_cache_aid_, shift_cache_rid_); return s; + } else if (last_shifted_ >= 0 && shift > last_shifted_) { + requested_batches_0 = shift_cache_aid_; + offset_in_current_batch_0 = shift_cache_rid_; + shift_0 = shift - last_shifted_; + } else { + requested_batches_0 = requested_batches_; + offset_in_current_batch_0 = offset_in_current_batch_; + shift_0 = shift; } - int64_t remaining = shift - batch_remaining; - int32_t batch_i = requested_batches + 1; + batch_length_0 = lazy_in_->GetNumRowsOfBatch(requested_batches_0); + batch_remaining_0 = (batch_length_0 - 1) - offset_in_current_batch_0; + if (shift_0 <= batch_remaining_0) { + int64_t rid = offset_in_current_batch_0 + shift_0; + ArrayItemIndexS s(requested_batches_0, rid); + last_shifted_ = shift; + shift_cache_aid_ = requested_batches_0; + shift_cache_rid_ = rid; + return s; + } + int64_t remaining = shift_0 - batch_remaining_0; + int32_t batch_i = requested_batches_0 + 1; while (true) { int64_t current_batch_length = lazy_in_->GetNumRowsOfBatch(batch_i); if (remaining <= current_batch_length) { - ArrayItemIndexS s(batch_i, remaining - 1); + int64_t rid = remaining - 1; + ArrayItemIndexS s(batch_i, rid); + last_shifted_ = shift; + shift_cache_aid_ = batch_i; + shift_cache_rid_ = rid; return s; } remaining -= current_batch_length; @@ -173,25 +209,53 @@ class SortRelation { if (!is_lazy_input_) { return offset_ + shift < items_total_; } - int64_t batch_length = lazy_in_->GetNumRowsOfBatch(requested_batches); - if (batch_length == -1L) { + + int64_t batch_length_0; + int64_t batch_remaining_0; + int64_t offset_in_current_batch_0; + int32_t shift_0; + int32_t requested_batches_0; + + if (shift == rb_last_shifted_) { + return true; + } else if (rb_last_shifted_ >= 0 && shift > rb_last_shifted_) { + requested_batches_0 = rb_shift_cache_aid_; + offset_in_current_batch_0 = rb_shift_cache_rid_; + shift_0 = shift - rb_last_shifted_; + } else { + requested_batches_0 = requested_batches_; + offset_in_current_batch_0 = offset_in_current_batch_; + shift_0 = shift; + } + batch_length_0 = lazy_in_->GetNumRowsOfBatch(requested_batches_0); + batch_remaining_0 = (batch_length_0 - 1) - offset_in_current_batch_0; + + if (batch_length_0 == -1L) { return false; } - int64_t batch_remaining = (batch_length - 1) - offset_in_current_batch_; - if (shift <= batch_remaining) { + if (shift_0 <= batch_remaining_0) { + rb_last_shifted_ = shift; + rb_shift_cache_aid_ = requested_batches_0; + rb_shift_cache_rid_ = offset_in_current_batch_0 + shift_0; return true; } - int64_t remaining = shift - batch_remaining; - int32_t batch_i = requested_batches + 1; - while (remaining >= 0) { + int64_t remaining = shift_0 - batch_remaining_0; + int32_t batch_i = requested_batches_0 + 1; + while (true) { int64_t current_batch_length = lazy_in_->GetNumRowsOfBatch(batch_i); if (current_batch_length == -1L) { return false; } + ArrayAdvanceTo(batch_i); + if (remaining <= current_batch_length) { + rb_last_shifted_ = shift; + rb_shift_cache_aid_ = batch_i; + rb_shift_cache_rid_ = remaining - 1; + return true; + } remaining -= current_batch_length; batch_i++; } - return true; } // IS THIS POSSIBLY BUGGY AS THE FIRST ELEMENT DID NOT GET CHECKED? @@ -200,12 +264,16 @@ class SortRelation { if (!CheckRangeBound(1)) return false; offset_++; range_cache_ = -1; + last_shifted_ = -1; + rb_last_shifted_ = -1; return true; } if (!CheckRangeBound(1)) return false; Advance(1); offset_++; range_cache_ = -1; + last_shifted_ = -1; + rb_last_shifted_ = -1; return true; } @@ -215,6 +283,8 @@ class SortRelation { if (!CheckRangeBound(range)) return false; offset_ += range; range_cache_ = -1; + last_shifted_ = -1; + rb_last_shifted_ = -1; return true; } auto range = GetSameKeyRange(); @@ -222,6 +292,8 @@ class SortRelation { Advance(range); offset_ += range; range_cache_ = -1; + last_shifted_ = -1; + rb_last_shifted_ = -1; return true; } @@ -233,6 +305,9 @@ class SortRelation { bool is_same = true; while (is_same) { if (CheckRangeBound(range + 1)) { + // std::cout << "DEBUG -> rb_last_shifted_: " << rb_last_shifted_ << ", + // rb_shift_cache_aid_: " << rb_shift_cache_aid_ << ", rb_shift_cache_rid_: " << + // rb_shift_cache_rid_ << std::endl; auto cur_idx = GetItemIndexWithShift(range); auto cur_idx_plus_one = GetItemIndexWithShift(range + 1); for (auto col : sort_relation_key_list_) { @@ -286,13 +361,22 @@ class SortRelation { std::shared_ptr lazy_in_; uint64_t offset_ = 0; int64_t offset_in_current_batch_ = 0; - int32_t requested_batches = 0; + int32_t requested_batches_ = 0; int range_cache_ = -1; std::vector> sort_relation_key_list_; std::vector> sort_relation_payload_list_; - // required by legacy method + // flags and caches bool is_lazy_input_ = false; + int32_t fetched_batches_ = -1; + int32_t released_batches_ = -1; + int32_t last_shifted_ = -1; + int32_t shift_cache_aid_ = -1; + int64_t shift_cache_rid_ = -1; + + int32_t rb_last_shifted_ = -1; + int32_t rb_shift_cache_aid_ = -1; + int64_t rb_shift_cache_rid_ = -1; std::shared_ptr indices_buf_; ArrayItemIndexS* indices_begin_;