Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Enhance seq iter. (#789)
Browse files Browse the repository at this point in the history
* Refactor seq iter.

* Remove logical row.

* Remove join iter.

* Erase data directly.

* Fix union.

* Refactor cartesian product.

* Fix cartesian product and shortest path.

* Fix unstable erase.

* Rebase and fix join.

* Enable erase by swap test.

* Remove unneccesary code in sort.
  • Loading branch information
CPWstatic authored Mar 9, 2021
1 parent d04aae9 commit ed4af5b
Show file tree
Hide file tree
Showing 25 changed files with 204 additions and 755 deletions.
121 changes: 32 additions & 89 deletions src/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,9 @@ List GetNeighborsIter::getEdges() {

SequentialIter::SequentialIter(std::shared_ptr<Value> value) : Iterator(value, Kind::kSequential) {
DCHECK(value->isDataSet());
auto& ds = value->getDataSet();
for (auto& row : ds.rows) {
rows_.emplace_back(&row);
}
iter_ = rows_.begin();
auto& ds = value->mutableDataSet();
iter_ = ds.rows.begin();
rows_ = &ds.rows;
for (size_t i = 0; i < ds.colNames.size(); ++i) {
colIndices_.emplace(ds.colNames[i], i);
}
Expand All @@ -488,111 +486,56 @@ void SequentialIter::init(std::vector<std::unique_ptr<Iterator>>&& iterators) {
const auto& firstIter = iterators.front();
DCHECK(firstIter->isSequentialIter());
colIndices_ = static_cast<const SequentialIter*>(firstIter.get())->getColIndices();
DataSet ds;
for (auto& iter : iterators) {
DCHECK(iter->isSequentialIter());
auto inputIter = static_cast<SequentialIter*>(iter.get());
rows_.insert(rows_.end(),
ds.rows.insert(ds.rows.end(),
std::make_move_iterator(inputIter->begin()),
std::make_move_iterator(inputIter->end()));
}
iter_ = rows_.begin();
value_ = std::make_shared<Value>(std::move(ds));
rows_ = &value_->mutableDataSet().rows;
iter_ = rows_->begin();
}

const Value& SequentialIter::getColumn(int32_t index) const {
return getColumnByIndex(index, iter_);
bool SequentialIter::valid() const {
return iter_ < rows_->end();
}

void JoinIter::joinIndex(const Iterator* lhs, const Iterator* rhs) {
size_t nextSeg = 0;
if (lhs != nullptr) {
switch (lhs->kind()) {
case Iterator::Kind::kSequential: {
nextSeg = buildIndexFromSeqIter(static_cast<const SequentialIter*>(lhs), 0);
break;
}
case Iterator::Kind::kJoin: {
nextSeg = buildIndexFromJoinIter(static_cast<const JoinIter*>(lhs), 0);
break;
}
case Iterator::Kind::kProp: {
nextSeg = buildIndexFromPropIter(static_cast<const PropIter*>(lhs), 0);
break;
}
case Iterator::Kind::kDefault:
case Iterator::Kind::kGetNeighbors: {
LOG(FATAL) << "Join Not Support " << lhs->kind();
break;
}
}
}
if (rhs == nullptr) {
return;
}
switch (rhs->kind()) {
case Iterator::Kind::kSequential: {
buildIndexFromSeqIter(static_cast<const SequentialIter*>(rhs), nextSeg);
break;
}
case Iterator::Kind::kJoin: {
buildIndexFromJoinIter(static_cast<const JoinIter*>(rhs), nextSeg);
break;
}
case Iterator::Kind::kProp: {
buildIndexFromPropIter(static_cast<const PropIter*>(rhs), nextSeg);
break;
}
case Iterator::Kind::kDefault:
case Iterator::Kind::kGetNeighbors: {
LOG(FATAL) << "Join Not Support " << lhs->kind();
break;
}
void SequentialIter::next() {
if (valid()) {
++iter_;
}
}

size_t JoinIter::buildIndexFromPropIter(const PropIter* iter, size_t segIdx) {
auto colIdxStart = colIdxIndices_.size();
for (auto& col : iter->getColIndices()) {
DCHECK_LT(col.second + colIdxStart, colNames_.size());
auto& colName = colNames_[col.second + colIdxStart];
colIndices_.emplace(colName, std::make_pair(segIdx, col.second));
colIdxIndices_.emplace(col.second + colIdxStart, std::make_pair(segIdx, col.second));
}
return segIdx + 1;
void SequentialIter::erase() {
iter_ = rows_->erase(iter_);
}

size_t JoinIter::buildIndexFromSeqIter(const SequentialIter* iter, size_t segIdx) {
auto colIdxStart = colIdxIndices_.size();
for (auto& col : iter->getColIndices()) {
DCHECK_LT(col.second + colIdxStart, colNames_.size());
auto& colName = colNames_[col.second + colIdxStart];
colIndices_.emplace(colName, std::make_pair(segIdx, col.second));
colIdxIndices_.emplace(col.second + colIdxStart, std::make_pair(segIdx, col.second));
}
return segIdx + 1;
void SequentialIter::unstableErase() {
std::swap(rows_->back(), *iter_);
rows_->pop_back();
}

size_t JoinIter::buildIndexFromJoinIter(const JoinIter* iter, size_t segIdx) {
auto colIdxStart = colIdxIndices_.size();
size_t nextSeg = 0;
if (iter->getColIndices().empty()) {
return nextSeg;
void SequentialIter::eraseRange(size_t first, size_t last) {
if (first >= last || first >= size()) {
return;
}

for (auto& col : iter->getColIdxIndices()) {
auto oldSeg = col.second.first;
size_t newSeg = oldSeg + segIdx;
if (newSeg > nextSeg) {
nextSeg = newSeg;
}
DCHECK_LT(col.first + colIdxStart, colNames_.size());
auto& colName = colNames_[col.first + colIdxStart];
colIndices_.emplace(colName, std::make_pair(newSeg, col.second.second));
colIdxIndices_.emplace(col.first + colIdxStart, std::make_pair(newSeg, col.second.second));
if (last > size()) {
rows_->erase(rows_->begin() + first, rows_->end());
} else {
rows_->erase(rows_->begin() + first, rows_->begin() + last);
}
return nextSeg + 1;
reset();
}

void SequentialIter::doReset(size_t pos) {
DCHECK((pos == 0 && size() == 0) || (pos < size()));
iter_ = rows_->begin() + pos;
}

const Value& JoinIter::getColumn(int32_t index) const {
const Value& SequentialIter::getColumn(int32_t index) const {
return getColumnByIndex(index, iter_);
}

Expand Down
Loading

0 comments on commit ed4af5b

Please sign in to comment.