Skip to content

Commit

Permalink
fix iterators may not be closed (#43)
Browse files Browse the repository at this point in the history
Change-Id: I3709d96fd2114fa782d6f28f8853b32d65fbd22b
  • Loading branch information
javeme authored and zhoney committed Jan 8, 2020
1 parent 24b8a33 commit 4bd64ce
Show file tree
Hide file tree
Showing 14 changed files with 790 additions and 65 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.7.1</version>
<version>1.7.2</version>

<name>hugegraph-common</name>
<url>https://github.com/hugegraph/hugegraph-common</url>
Expand Down Expand Up @@ -218,7 +218,7 @@
<manifestEntries>
<!-- Must be on one line, otherwise the automatic
upgrade script cannot replace the version number -->
<Implementation-Version>1.7.1.0</Implementation-Version>
<Implementation-Version>1.7.2.0</Implementation-Version>
</manifestEntries>
</archive>
</configuration>
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.iterator;

import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.google.common.collect.ImmutableList;

public class BatchMapperIterator<T, R> extends WrappedIterator<R> {

private final int batch;
private final Iterator<T> originIterator;
private final Function<List<T>, Iterator<R>> mapperCallback;

private Iterator<R> batchIterator;

public BatchMapperIterator(int batch, Iterator<T> origin,
Function<List<T>, Iterator<R>> mapper) {
E.checkArgument(batch > 0, "Expect batch > 0, but got %s", batch);
this.batch = batch;
this.originIterator = origin;
this.mapperCallback = mapper;
this.batchIterator = null;
}

@Override
protected Iterator<T> originIterator() {
return this.originIterator;
}

@Override
protected final boolean fetch() {
if (this.batchIterator != null && this.fetchFromBatch()) {
return true;
}

List<T> list = this.nextBatch();
if (!list.isEmpty()) {
assert this.batchIterator == null;
// Do fetch
this.batchIterator = this.mapperCallback.apply(list);
if (this.batchIterator != null && this.fetchFromBatch()) {
return true;
}
}
return false;
}

protected final List<T> nextBatch() {
if (!this.originIterator.hasNext()) {
return ImmutableList.of();
}
List<T> list = InsertionOrderUtil.newList();
for (int i = 0; i < this.batch && this.originIterator.hasNext(); i++) {
T next = this.originIterator.next();
list.add(next);
}
return list;
}

protected final boolean fetchFromBatch() {
E.checkNotNull(this.batchIterator, "mapper results");
while (this.batchIterator.hasNext()) {
R result = this.batchIterator.next();
if (result != null) {
assert this.current == none();
this.current = result;
return true;
}
}
this.resetBatchIterator();
return false;
}

protected final void resetBatchIterator() {
if (this.batchIterator == null) {
return;
}
close(this.batchIterator);
this.batchIterator = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,5 @@

import java.util.Iterator;

public class IterableIterator<T> implements Iterator<T> {

private final Iterable<T> iterable;
private final Iterator<T> iterator;

public IterableIterator(Iterable<T> iterable) {
this.iterable = iterable;
this.iterator = iterable.iterator();
}

@Override
public boolean hasNext() {
return this.iterator.hasNext();
}

@Override
public T next() {
return this.iterator.next();
}

@Override
public void remove() {
this.iterator.remove();
}

public Iterable<T> iterable() {
return iterable;
}
public interface CIter<R> extends Iterator<R>, AutoCloseable, Metadatable {
}
13 changes: 2 additions & 11 deletions src/main/java/com/baidu/hugegraph/iterator/ExtendableIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,20 @@

package com.baidu.hugegraph.iterator;

import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;

import com.baidu.hugegraph.util.E;

public class ExtendableIterator<T> extends WrappedIterator<T> {

private final Deque<Iterator<T>> itors;
private final List<Iterator<T>> removedItors;

private Iterator<T> currentIterator;

public ExtendableIterator() {
this.itors = new ConcurrentLinkedDeque<>();
this.removedItors = new ArrayList<>();
this.currentIterator = null;
}

Expand All @@ -62,11 +58,6 @@ public ExtendableIterator<T> extend(Iterator<T> iter) {

@Override
public void close() throws Exception {
for (Iterator<T> iter : this.removedItors) {
if (iter instanceof AutoCloseable) {
((AutoCloseable) iter).close();
}
}
for (Iterator<T> iter : this.itors) {
if (iter instanceof AutoCloseable) {
((AutoCloseable) iter).close();
Expand All @@ -75,7 +66,7 @@ public void close() throws Exception {
}

@Override
protected Iterator<?> originIterator() {
protected Iterator<T> originIterator() {
return this.currentIterator;
}

Expand All @@ -98,7 +89,7 @@ protected boolean fetch() {
// The last one
return false;
}
this.removedItors.add(this.itors.removeFirst());
close(this.itors.removeFirst());
}

assert first != null && first.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ public FlatMapperFilterIterator(Iterator<T> origin,
}

@Override
protected final boolean fetchMapped() {
E.checkNotNull(this.results, "mapper results");
while (this.results.hasNext()) {
R result = this.results.next();
protected final boolean fetchFromBatch() {
E.checkNotNull(this.batchIterator, "mapper results");
while (this.batchIterator.hasNext()) {
R result = this.batchIterator.next();
if (result != null && this.filterCallback.apply(result)) {
assert this.current == none();
this.current = result;
return true;
}
}
this.results = null;
this.resetBatchIterator();
return false;
}
}
39 changes: 27 additions & 12 deletions src/main/java/com/baidu/hugegraph/iterator/FlatMapperIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,48 +29,63 @@ public class FlatMapperIterator<T, R> extends WrappedIterator<R> {
private final Iterator<T> originIterator;
private final Function<T, Iterator<R>> mapperCallback;

protected Iterator<R> results;
protected Iterator<R> batchIterator;

public FlatMapperIterator(Iterator<T> origin,
Function<T, Iterator<R>> mapper) {
this.originIterator = origin;
this.mapperCallback = mapper;
this.results = null;
this.batchIterator = null;
}

@Override
protected Iterator<?> originIterator() {
public void close() throws Exception {
this.resetBatchIterator();
super.close();
}

@Override
protected Iterator<T> originIterator() {
return this.originIterator;
}

@Override
protected final boolean fetch() {
if (this.results != null && this.fetchMapped()) {
if (this.batchIterator != null && this.fetchFromBatch()) {
return true;
}

while (this.originIterator.hasNext()) {
T next = this.originIterator.next();
assert this.results == null;
this.results = this.mapperCallback.apply(next);
if (this.results != null && this.fetchMapped()) {
assert this.batchIterator == null;
// Do fetch
this.batchIterator = this.mapperCallback.apply(next);
if (this.batchIterator != null && this.fetchFromBatch()) {
return true;
}
}
return false;
}

protected boolean fetchMapped() {
E.checkNotNull(this.results, "mapper results");
while (this.results.hasNext()) {
R result = this.results.next();
protected boolean fetchFromBatch() {
E.checkNotNull(this.batchIterator, "mapper results");
while (this.batchIterator.hasNext()) {
R result = this.batchIterator.next();
if (result != null) {
assert this.current == none();
this.current = result;
return true;
}
}
this.results = null;
this.resetBatchIterator();
return false;
}

protected final void resetBatchIterator() {
if (this.batchIterator == null) {
return;
}
close(this.batchIterator);
this.batchIterator = null;
}
}
80 changes: 80 additions & 0 deletions src/main/java/com/baidu/hugegraph/iterator/ListIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.iterator;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import com.baidu.hugegraph.util.InsertionOrderUtil;

public class ListIterator<T> extends WrappedIterator<T> {

private final Iterator<T> originIterator;
private final Iterator<T> resultsIterator;
private final Collection<T> results;

public ListIterator(long capacity, Iterator<T> origin) {
List<T> results = InsertionOrderUtil.newList();
while (origin.hasNext()) {
if (capacity >= 0L && results.size() >= capacity) {
throw new IllegalArgumentException(
"The iterator exceeded capacity " + capacity);
}
results.add(origin.next());
}
this.originIterator = origin;
this.results = Collections.unmodifiableList(results);
this.resultsIterator = this.results.iterator();
}

public ListIterator(Collection<T> origin) {
this.originIterator = origin.iterator();
this.results = origin instanceof List ?
Collections.unmodifiableList((List<T>) origin) :
Collections.unmodifiableCollection(origin);
this.resultsIterator = this.results.iterator();
}

@Override
public void remove() {
this.resultsIterator.remove();
}

public Collection<T> list() {
return this.results;
}

@Override
protected boolean fetch() {
assert this.current == none();
if (!this.resultsIterator.hasNext()) {
return false;
}
this.current = this.resultsIterator.next();
return true;
}

@Override
protected Iterator<T> originIterator() {
return this.originIterator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public MapperIterator(Iterator<T> origin, Function<T, R> mapper) {
}

@Override
protected Iterator<?> originIterator() {
protected Iterator<T> originIterator() {
return this.originIterator;
}

Expand Down
Loading

0 comments on commit 4bd64ce

Please sign in to comment.