Skip to content

Commit

Permalink
Fix groupbykey test in JavaAPISuite of streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent ec8cc3e commit e687f21
Showing 1 changed file with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {

public void equalIterator(Iterator<?> a, Iterator<?> b) {
while (a.hasNext() && b.hasNext()) {
Assert.assertEquals(a.next(), b.next());
}
Assert.assertEquals(a.hasNext(), b.hasNext());
}

@SuppressWarnings("unchecked")
@Test
public void testCount() {
Expand Down Expand Up @@ -1018,9 +1025,22 @@ public void testPairGroupByKey() {

JavaPairDStream<String, Iterator<String>> grouped = pairStream.groupByKey();
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected, result);
List<List<Tuple2<String, Iterator<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected.size(), result.size());
Iterator<List<Tuple2<String, Iterator<String>>>> resultItr = result.iterator();
Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator();
while (resultItr.hasNext() && expectedItr.hasNext()) {
Iterator<Tuple2<String, Iterator<String>>> resultElements = resultItr.next().iterator();
Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator();
while (resultElements.hasNext() && expectedElements.hasNext()) {
Tuple2<String, Iterator<String>> resultElement = resultElements.next();
Tuple2<String, List<String>> expectedElement = expectedElements.next();
Assert.assertEquals(expectedElement._1(), resultElement._1());
equalIterator(expectedElement._2().iterator(), resultElement._2());
}
Assert.assertEquals(resultElements.hasNext(), expectedItr.hasNext());
}
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit e687f21

Please sign in to comment.