Skip to content

Commit

Permalink
chore: Add missing create method to javadsl Graph (#1230)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Mar 24, 2024
1 parent f71c083 commit c55837e
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
Expand All @@ -28,10 +29,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

public class GraphDslTest extends StreamTest {
public GraphDslTest() {
Expand Down Expand Up @@ -187,6 +185,94 @@ public void demonstrateMatValue() throws Exception {
// #graph-dsl-matvalue-cycle
}

@Test
public void beAbleToUseMergeSortedWithGraphDSL() throws Exception {
final Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7, 9));
final Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8, 10));

final Source<Integer, NotUsed> source =
Source.fromGraph(
GraphDSL.create(
sourceA,
sourceB,
Keep.none(),
(builder, a, b) -> {
final FanInShape2<Integer, Integer, Integer> mergeSorted =
builder.add(MergeSorted.create());
builder.from(a).toInlet(mergeSorted.in0());
builder.from(b).toInlet(mergeSorted.in1());
return SourceShape.of(mergeSorted.out());
}));
final List<Integer> result =
source.runWith(Sink.seq(), system).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), result);
}

@Test
public void beAbleToUseOrElseWithGraphDSL() throws Exception {
final Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7, 9));
final Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8, 10));
final Source<Integer, NotUsed> source =
Source.fromGraph(
GraphDSL.create(
sourceA,
sourceB,
Keep.none(),
(builder, a, b) -> {
final UniformFanInShape<Integer, Integer> orElse = builder.add(OrElse.create());
builder.from(a).toInlet(orElse.in(0));
builder.from(b).toInlet(orElse.in(1));
return SourceShape.of(orElse.out());
}));
final List<Integer> result =
source.runWith(Sink.seq(), system).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 3, 5, 7, 9), result);
}

@Test
public void beAbleToUseWireTapWithGraphDSL() throws Exception {
final Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4, 5));
final Sink<Integer, CompletionStage<Done>> sink = Sink.ignore();
final Source<Integer, NotUsed> source =
Source.fromGraph(
GraphDSL.create(
sourceA,
sink,
Keep.none(),
(builder, sourceShape, sinkShape) -> {
final FanOutShape2<Integer, Integer, Integer> wireTap =
builder.add(WireTap.create());
builder.from(sourceShape).toInlet(wireTap.in());
builder.from(wireTap.out1()).to(sinkShape);
return SourceShape.of(wireTap.out0());
}));
final List<Integer> result =
source.runWith(Sink.seq(), system).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
}

@Test
public void beAbleToUseInterleaveWithGraphDSL() throws Exception {
final Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
final Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
final Source<Integer, NotUsed> source =
Source.fromGraph(
GraphDSL.create(
sourceA,
sourceB,
Keep.none(),
(builder, a, b) -> {
final UniformFanInShape<Integer, Integer> interleave =
builder.add(Interleave.create(2, 2));
builder.from(a).toInlet(interleave.in(0));
builder.from(b).toInlet(interleave.in(1));
return SourceShape.of(interleave.out());
}));
final List<Integer> result =
source.runWith(Sink.seq(), system).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 2, 10, 20, 3, 4, 30, 40), result);
}

@Test
public void beAbleToConstructClosedGraphFromList() throws Exception {
// #graph-from-list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class GraphMergeSortedSpec extends TwoStreamsSetup with ScalaCheckPropertyChecks
override type Outputs = Int

override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture {
val merge = b.add(new MergeSorted[Outputs])
val merge = b.add(MergeSorted[Outputs]())

override def left: Inlet[Outputs] = merge.in0
override def right: Inlet[Outputs] = merge.in1
Expand Down
102 changes: 102 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.apache.pekko.stream.javadsl

import java.util
import java.util.Comparator

import scala.annotation.unchecked.uncheckedVariance

Expand Down Expand Up @@ -221,6 +222,82 @@ object Broadcast {

}

/**
* Merge two pre-sorted streams such that the resulting stream is sorted.
*
* '''Emits when''' both inputs have an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
object MergeSorted {

/**
* Create a new `MergeSorted` operator with the specified input type.
*/
def create[T <: Comparable[T]](): Graph[FanInShape2[T, T, T], NotUsed] = scaladsl.MergeSorted[T]()

/**
* Create a new `MergeSorted` operator with the specified input type.
*/
def create[T](comparator: Comparator[T]): Graph[FanInShape2[T, T, T], NotUsed] = {
implicit val ord: Ordering[T] = Ordering.comparatorToOrdering(comparator)
scaladsl.MergeSorted[T]()
}

}

/**
* Takes two streams and passes the first through, the secondary stream is only passed
* through if the primary stream completes without passing any elements through. When
* the first element is passed through from the primary the secondary is cancelled.
* Both incoming streams are materialized when the operator is materialized.
*
* On errors the operator is failed regardless of source of the error.
*
* '''Emits when''' element is available from primary stream or the primary stream closed without emitting any elements and an element
* is available from the secondary stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels
*/
object OrElse {

/**
* Create a new `OrElse` operator with the specified input type.
*/
def create[T](): Graph[UniformFanInShape[T, T], NotUsed] = scaladsl.OrElse[T]()
}

/**
* Fan-out the stream to two output streams - a 'main' and a 'tap' one. Each incoming element is emitted
* to the 'main' output; elements are also emitted to the 'tap' output if there is demand;
* otherwise they are dropped.
*
* '''Emits when''' element is available and demand exists from the 'main' output; the element will
* also be sent to the 'tap' output if there is demand.
*
* '''Backpressures when''' the 'main' output backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' the 'main' output cancels
*/
object WireTap {

/**
* Create a new `WireTap` operator with the specified output type.
*/
def create[T](): Graph[FanOutShape2[T, T, T], NotUsed] = scaladsl.WireTap[T]()
}

/**
* Fan-out the stream to several streams. emitting an incoming upstream element to one downstream consumer according
* to the partitioner function applied to the element
Expand Down Expand Up @@ -603,6 +680,31 @@ object MergeSequence {

}

object Interleave {

/**
* Create a new `Interleave` with the specified number of input ports and given size of elements
* to take from each input.
*
* @param inputPorts number of input ports
* @param segmentSize number of elements to send downstream before switching to next input port
* @param eagerClose if true, interleave completes upstream if any of its upstream completes.
*/
def create[T](
inputPorts: Int, segmentSize: Int, eagerClose: Boolean): Graph[UniformFanInShape[T, T], NotUsed] =
scaladsl.Interleave(inputPorts, segmentSize, eagerClose)

/**
* Create a new `Interleave` with the specified number of input ports and given size of elements
* to take from each input, with `eagerClose` set to false.
*
* @param inputPorts number of input ports
* @param segmentSize number of elements to send downstream before switching to next input port
*/
def create[T](inputPorts: Int, segmentSize: Int): Graph[UniformFanInShape[T, T], NotUsed] =
create(inputPorts, segmentSize, eagerClose = false)
}

object GraphDSL extends GraphCreate {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3500,7 +3500,7 @@ trait FlowOps[+Out, +Mat] {
protected def mergeSortedGraph[U >: Out, M](that: Graph[SourceShape[U], M])(
implicit ord: Ordering[U]): Graph[FlowShape[Out @uncheckedVariance, U], M] =
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(new MergeSorted[U])
val merge = b.add(MergeSorted[U]())
r ~> merge.in1
FlowShape(merge.in0, merge.out)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,14 @@ final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerCl
override def toString = "Interleave"
}

object MergeSorted {

/**
* Create a new `MergeSorted`, @see [[MergeSorted]]
*/
def apply[T: Ordering](): MergeSorted[T] = new MergeSorted[T]()
}

/**
* Merge two pre-sorted streams such that the resulting stream is sorted.
*
Expand Down Expand Up @@ -1368,7 +1376,7 @@ object OrElse {
/**
* @see [[OrElse]]
*/
def apply[T]() = singleton.asInstanceOf[OrElse[T]]
def apply[T](): OrElse[T] = singleton.asInstanceOf[OrElse[T]]
}

/**
Expand Down

0 comments on commit c55837e

Please sign in to comment.