Skip to content

Commit

Permalink
fix BatchMapperIterator stopped when fetched none in the middle batch (
Browse files Browse the repository at this point in the history
…#64)

Change-Id: I658d8e95a68c8a9494efa98e88f350cf6d65b021
  • Loading branch information
javeme authored Feb 25, 2021
1 parent c80f959 commit cde7763
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ protected final boolean fetch() {
return true;
}

List<T> list = this.nextBatch();
if (!list.isEmpty()) {
assert this.batchIterator == null;
List<T> batch = this.nextBatch();
assert this.batchIterator == null;
while (!batch.isEmpty()) {
// Do fetch
this.batchIterator = this.mapperCallback.apply(list);
this.batchIterator = this.mapperCallback.apply(batch);
if (this.batchIterator != null && this.fetchFromBatch()) {
return true;
}
// Try next batch
batch = this.nextBatch();
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public boolean hasNext() {
public R next() {
if (this.current == none()) {
this.fetch();
}
if (this.current == none()) {
throw new NoSuchElementException();
if (this.current == none()) {
throw new NoSuchElementException();
}
}
R current = this.current;
this.current = none();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.junit.Test;
Expand Down Expand Up @@ -235,6 +236,42 @@ public void testMapperReturnNullThenHasNext() {
});
Assert.assertFalse(results.hasNext());
Assert.assertFalse(results.hasNext());

AtomicInteger count1 = new AtomicInteger(0);
results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
if (count1.incrementAndGet() == 1) {
return null;
}
return batch.iterator();
});
Assert.assertTrue(results.hasNext());
Assert.assertEquals(5, results.next());
Assert.assertEquals(6, results.next());
Assert.assertFalse(results.hasNext());

AtomicInteger count2 = new AtomicInteger(0);
results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
if (count2.incrementAndGet() == 2) {
return null;
}
return batch.iterator();
});
Assert.assertTrue(results.hasNext());
Assert.assertEquals(4, results.next());
Assert.assertEquals(6, results.next());
Assert.assertFalse(results.hasNext());

AtomicInteger count3 = new AtomicInteger(0);
results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
if (count3.incrementAndGet() == 3) {
return null;
}
return batch.iterator();
});
Assert.assertTrue(results.hasNext());
Assert.assertEquals(4, results.next());
Assert.assertEquals(5, results.next());
Assert.assertFalse(results.hasNext());
}

@Test
Expand All @@ -252,6 +289,53 @@ public void testMapperReturnNullThenNext() {
});
}

@Test
public void testMapperReturnEmptyThenHasNext() {
Iterator<Integer> results;

results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
return Collections.emptyIterator();
});
Assert.assertFalse(results.hasNext());
Assert.assertFalse(results.hasNext());

AtomicInteger count1 = new AtomicInteger(0);
results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
if (count1.incrementAndGet() == 1) {
return Collections.emptyIterator();
}
return batch.iterator();
});
Assert.assertTrue(results.hasNext());
Assert.assertEquals(5, results.next());
Assert.assertEquals(6, results.next());
Assert.assertFalse(results.hasNext());

AtomicInteger count2 = new AtomicInteger(0);
results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
if (count2.incrementAndGet() == 2) {
return Collections.emptyIterator();
}
return batch.iterator();
});
Assert.assertTrue(results.hasNext());
Assert.assertEquals(4, results.next());
Assert.assertEquals(6, results.next());
Assert.assertFalse(results.hasNext());

AtomicInteger count3 = new AtomicInteger(0);
results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
if (count3.incrementAndGet() == 3) {
return Collections.emptyIterator();
}
return batch.iterator();
});
Assert.assertTrue(results.hasNext());
Assert.assertEquals(4, results.next());
Assert.assertEquals(5, results.next());
Assert.assertFalse(results.hasNext());
}

@Test
public void testClose() throws Exception {
CloseableItor<Integer> vals = new CloseableItor<>(DATA1.iterator());
Expand Down

0 comments on commit cde7763

Please sign in to comment.