diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java index 0a5403dbbf..21a4a3c2a7 100644 --- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java +++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java @@ -105,10 +105,7 @@ public void publishAndConsume() throws Exception { assertEquals( input, - result - .toCompletableFuture() - .get(3, TimeUnit.SECONDS) - .stream() + result.toCompletableFuture().get(3, TimeUnit.SECONDS).stream() .map(m -> m.bytes().utf8String()) .collect(Collectors.toList())); } @@ -278,10 +275,7 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { assertEquals( input, - result - .toCompletableFuture() - .get(3, TimeUnit.SECONDS) - .stream() + result.toCompletableFuture().get(3, TimeUnit.SECONDS).stream() .map(m -> m.bytes().utf8String()) .collect(Collectors.toList())); } @@ -335,10 +329,7 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { assertEquals( input, - result2 - .toCompletableFuture() - .get(10, TimeUnit.SECONDS) - .stream() + result2.toCompletableFuture().get(10, TimeUnit.SECONDS).stream() .map(m -> m.message().bytes().utf8String()) .collect(Collectors.toList())); diff --git a/cassandra/src/test/java/docs/javadsl/CassandraSourceTest.java b/cassandra/src/test/java/docs/javadsl/CassandraSourceTest.java index 7ca7d31da1..6733afd0ba 100644 --- a/cassandra/src/test/java/docs/javadsl/CassandraSourceTest.java +++ b/cassandra/src/test/java/docs/javadsl/CassandraSourceTest.java @@ -118,9 +118,7 @@ public void streamStatementResult() throws Exception { assertEquals( IntStream.range(1, 103).boxed().collect(Collectors.toSet()), - rows.toCompletableFuture() - .get(3, TimeUnit.SECONDS) - .stream() + rows.toCompletableFuture().get(3, TimeUnit.SECONDS).stream() .map(r -> r.getInt("id")) .collect(Collectors.toSet())); } @@ -149,10 +147,7 @@ public void flowInputValues() throws Exception { List resultToAssert = result.toCompletableFuture().get(); Set found = - session - .execute("select * from akka_stream_java_test.test") - .all() - .stream() + session.execute("select * from akka_stream_java_test.test").all().stream() .map(r -> r.getInt("id")) .collect(Collectors.toSet()); @@ -194,10 +189,7 @@ public void flowBatchInputValues() throws Exception { Set resultToAssert = result.toCompletableFuture().get().stream().map(ti -> ti.cc).collect(Collectors.toSet()); Set found = - session - .execute("select * from akka_stream_java_test.test_batch") - .all() - .stream() + session.execute("select * from akka_stream_java_test.test_batch").all().stream() .map(r -> r.getInt("cc")) .collect(Collectors.toSet()); @@ -231,10 +223,7 @@ public void sinkInputValues() throws Exception { result.toCompletableFuture().get(); Set found = - session - .execute("select * from akka_stream_java_test.test") - .all() - .stream() + session.execute("select * from akka_stream_java_test.test").all().stream() .map(r -> r.getInt("id")) .collect(Collectors.toSet()); assertEquals(found, IntStream.range(1, 10).boxed().collect(Collectors.toSet())); diff --git a/couchbase/src/main/mima-filters/1.0.x.backwards.excludes b/couchbase/src/main/mima-filters/1.0.x.backwards.excludes new file mode 100644 index 0000000000..d4b9d8ef5c --- /dev/null +++ b/couchbase/src/main/mima-filters/1.0.x.backwards.excludes @@ -0,0 +1,2 @@ +# Allow changes to impl +ProblemFilters.exclude[Problem]("akka.stream.alpakka.couchbase.impl.*") diff --git a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/SetupStage.scala b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/SetupStage.scala deleted file mode 100644 index b94abdf40e..0000000000 --- a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/SetupStage.scala +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.couchbase.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -/** - * Internal API. - */ -@InternalApi private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow)(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -/** - * Internal API. - */ -@InternalApi private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -/** - * Internal API. - */ -@InternalApi private[couchbase] object Setup { - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala index b9b4cb5515..1ef62ce32b 100644 --- a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala +++ b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala @@ -4,7 +4,6 @@ package akka.stream.alpakka.couchbase.scaladsl import akka.NotUsed -import akka.stream.alpakka.couchbase.impl.Setup import akka.stream.alpakka.couchbase._ import akka.stream.scaladsl.Flow import com.couchbase.client.java.document.{Document, JsonDocument} @@ -18,8 +17,8 @@ object CouchbaseFlow { * Create a flow to query Couchbase for by `id` and emit [[com.couchbase.client.java.document.JsonDocument JsonDocument]]s. */ def fromId(sessionSettings: CouchbaseSessionSettings, bucketName: String): Flow[String, JsonDocument, NotUsed] = - Setup - .flow { materializer => _ => + Flow + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[String] .mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */ ))(materializer.system.dispatcher)) @@ -33,8 +32,8 @@ object CouchbaseFlow { def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, bucketName: String, target: Class[T]): Flow[String, T, NotUsed] = - Setup - .flow { materializer => _ => + Flow + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[String] .mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */, target))(materializer.system.dispatcher)) @@ -48,8 +47,8 @@ object CouchbaseFlow { def upsert(sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[JsonDocument, JsonDocument, NotUsed] = - Setup - .flow { materializer => _ => + Flow + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[JsonDocument] .mapAsync(writeSettings.parallelism)( @@ -64,8 +63,8 @@ object CouchbaseFlow { def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[T, T, NotUsed] = - Setup - .flow { materializer => _ => + Flow + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[T] .mapAsync(writeSettings.parallelism)( @@ -81,8 +80,8 @@ object CouchbaseFlow { def upsertDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = - Setup - .flow { materializer => _ => + Flow + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[T] .mapAsync(writeSettings.parallelism)( @@ -105,8 +104,8 @@ object CouchbaseFlow { def delete(sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[String, String, NotUsed] = - Setup - .flow { materializer => _ => + Flow + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[String] .mapAsync(writeSettings.parallelism)( @@ -126,8 +125,8 @@ object CouchbaseFlow { def deleteWithResult(sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[String, CouchbaseDeleteResult, NotUsed] = - Setup - .flow { materializer => _ => + Flow + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[String] .mapAsync(writeSettings.parallelism)( diff --git a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala index 14f585726c..84fdee027b 100644 --- a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala +++ b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala @@ -5,7 +5,6 @@ package akka.stream.alpakka.couchbase.scaladsl import akka.NotUsed -import akka.stream.alpakka.couchbase.impl.Setup import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings} import akka.stream.scaladsl.Source import com.couchbase.client.java.document.json.JsonObject @@ -22,8 +21,8 @@ object CouchbaseSource { def fromStatement(sessionSettings: CouchbaseSessionSettings, statement: Statement, bucketName: String): Source[JsonObject, NotUsed] = - Setup - .source { materializer => _ => + Source + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Source .fromFuture(session.map(_.streamedQuery(statement))(materializer.system.dispatcher)) @@ -37,8 +36,8 @@ object CouchbaseSource { def fromN1qlQuery(sessionSettings: CouchbaseSessionSettings, query: N1qlQuery, bucketName: String): Source[JsonObject, NotUsed] = - Setup - .source { materializer => _ => + Source + .setup { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Source .fromFuture(session.map(_.streamedQuery(query))(materializer.system.dispatcher)) diff --git a/dynamodb/src/main/mima-filters/1.0.x.backwards.excludes b/dynamodb/src/main/mima-filters/1.0.x.backwards.excludes new file mode 100644 index 0000000000..6f5cf952c4 --- /dev/null +++ b/dynamodb/src/main/mima-filters/1.0.x.backwards.excludes @@ -0,0 +1,2 @@ +# Allow changes to impl +ProblemFilters.exclude[Problem]("akka.stream.alpakka.dynamodb.impl.*") diff --git a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/SetupStage.scala b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/SetupStage.scala deleted file mode 100644 index 413923adab..0000000000 --- a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/SetupStage.scala +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.dynamodb.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink)(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow)(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[dynamodb] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala index aae310e319..9c191c1775 100644 --- a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala +++ b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.dynamodb.scaladsl import akka.NotUsed import akka.stream.{ActorMaterializer, Attributes, Materializer} -import akka.stream.alpakka.dynamodb.impl.{Paginator, Setup} +import akka.stream.alpakka.dynamodb.impl.Paginator import akka.stream.alpakka.dynamodb.{AwsOp, AwsPagedOp, DynamoAttributes, DynamoClientExt} import akka.stream.scaladsl.{Flow, Sink, Source} @@ -21,17 +21,17 @@ object DynamoDb { * Create a Flow that emits a response for every request to DynamoDB. */ def flow[Op <: AwsOp]: Flow[Op, Op#B, NotUsed] = - Setup - .flow(clientFlow[Op]) + Flow + .setup(clientFlow[Op]) .mapMaterializedValue(_ => NotUsed) /** * Create a Source that will emit potentially multiple responses for a given request. */ def source(op: AwsPagedOp): Source[op.B, NotUsed] = - Setup - .source { mat => attr => - Paginator.source(clientFlow(mat)(attr), op) + Source + .setup { (mat, attr) => + Paginator.source(clientFlow(mat, attr), op) } .mapMaterializedValue(_ => NotUsed) @@ -47,7 +47,7 @@ object DynamoDb { def single(op: AwsOp)(implicit mat: Materializer): Future[op.B] = source(op).runWith(Sink.head) - private def clientFlow[Op <: AwsOp](mat: ActorMaterializer)(attr: Attributes) = + private def clientFlow[Op <: AwsOp](mat: ActorMaterializer, attr: Attributes) = attr .get[DynamoAttributes.Client] .map(_.client) diff --git a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java index f4d957343f..b9ef06a869 100644 --- a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java +++ b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java @@ -588,8 +588,7 @@ public void testUsingSearchParams() throws Exception { assertEquals( docs.size(), - result - .stream() + result.stream() .filter( d -> { return d.a != null && d.b == null; diff --git a/google-cloud-pub-sub-grpc/src/main/mima-filters/1.0.x.backwards.excludes b/google-cloud-pub-sub-grpc/src/main/mima-filters/1.0.x.backwards.excludes new file mode 100644 index 0000000000..342808320d --- /dev/null +++ b/google-cloud-pub-sub-grpc/src/main/mima-filters/1.0.x.backwards.excludes @@ -0,0 +1,2 @@ +# Allow changes to impl +ProblemFilters.exclude[Problem]("akka.stream.alpakka.googlecloud.pubsub.grpc.impl.*") diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/impl/SetupStage.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/impl/SetupStage.scala deleted file mode 100644 index 1b1ffbddf2..0000000000 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/impl/SetupStage.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.googlecloud.pubsub.grpc.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.javadsl -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[grpc] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def createSink[T, M](factory: ActorMaterializer => Attributes => javadsl.Sink[T, M]): javadsl.Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(mat => attr => factory(mat)(attr).asScala)).asJava - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def createFlow[T, U, M]( - factory: ActorMaterializer => Attributes => javadsl.Flow[T, U, M] - ): javadsl.Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(mat => attr => factory(mat)(attr).asScala)).asJava - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) - - def createSource[T, M]( - factory: ActorMaterializer => Attributes => javadsl.Source[T, M] - ): javadsl.Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(mat => attr => factory(mat)(attr).asScala)).asJava -} diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala index 059e5d37ee..af864d5654 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala @@ -8,16 +8,11 @@ import java.time.Duration import java.util.concurrent.{CompletableFuture, CompletionStage} import akka.actor.Cancellable -import akka.dispatch.ExecutionContexts -import akka.stream.alpakka.googlecloud.pubsub.grpc.impl.Setup import akka.stream.{ActorMaterializer, Attributes} import akka.stream.javadsl.{Flow, Keep, Sink, Source} import akka.{Done, NotUsed} import com.google.pubsub.v1._ -import scala.compat.java8.FutureConverters._ -import scala.concurrent.Future - /** * Google Pub/Sub Akka Stream operator factory. */ @@ -30,13 +25,13 @@ object GooglePubSub { * @param parallelism controls how many messages can be in-flight at any given time */ def publish(parallelism: Int): Flow[PublishRequest, PublishResponse, NotUsed] = - Setup - .createFlow { implicit mat => implicit attr => + Flow + .setup { (mat, attr) => Flow .create[PublishRequest] - .mapAsyncUnordered(parallelism, javaFunction(publisher().client.publish)) + .mapAsyncUnordered(parallelism, japiFunction(publisher(mat, attr).client.publish)) } - .mapMaterializedValue(javaFunction(_ => NotUsed)) + .mapMaterializedValue(japiFunction(_ => NotUsed)) /** * Create a source that emits messages for a given subscription. @@ -48,8 +43,8 @@ object GooglePubSub { */ def subscribe(request: StreamingPullRequest, pollInterval: Duration): Source[ReceivedMessage, CompletableFuture[Cancellable]] = - Setup - .createSource { implicit mat => implicit attr => + Source + .setup { (mat, attr) => val cancellable = new CompletableFuture[Cancellable]() val subsequentRequest = request.toBuilder @@ -57,21 +52,21 @@ object GooglePubSub { .setStreamAckDeadlineSeconds(0) .build() - subscriber().client + subscriber(mat, attr).client .streamingPull( Source .single(request) .concat( Source .tick(pollInterval, pollInterval, subsequentRequest) - .mapMaterializedValue(javaFunction(cancellable.complete)) + .mapMaterializedValue(japiFunction(cancellable.complete)) ) ) - .mapConcat(javaFunction(_.getReceivedMessagesList)) - .mapMaterializedValue(javaFunction(_ => cancellable)) + .mapConcat(japiFunction(_.getReceivedMessagesList)) + .mapMaterializedValue(japiFunction(_ => cancellable)) } - .mapMaterializedValue(javaFunction(flattenFutureCs)) - .mapMaterializedValue(javaFunction(_.toCompletableFuture)) + .mapMaterializedValue(japiFunction(flattenCs)) + .mapMaterializedValue(japiFunction(_.toCompletableFuture)) /** * Create a sink that accepts consumed message acknowledgements. @@ -81,36 +76,36 @@ object GooglePubSub { * @param parallelism controls how many acknowledgements can be in-flight at any given time */ def acknowledge(parallelism: Int): Sink[AcknowledgeRequest, CompletionStage[Done]] = - Setup - .createSink { implicit mat => implicit attr => + Sink + .setup { (mat, attr) => Flow .create[AcknowledgeRequest] - .mapAsyncUnordered(parallelism, javaFunction(subscriber().client.acknowledge)) + .mapAsyncUnordered(parallelism, japiFunction(subscriber(mat, attr).client.acknowledge)) .toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]]) } - .mapMaterializedValue(javaFunction(flattenFutureCs)) + .mapMaterializedValue(japiFunction(flattenCs)) /** * Helper for creating akka.japi.function.Function instances from Scala * functions as Scala 2.11 does not know about SAMs. */ - private def javaFunction[A, B](f: A => B): akka.japi.function.Function[A, B] = + private def japiFunction[A, B](f: A => B): akka.japi.function.Function[A, B] = new akka.japi.function.Function[A, B]() { override def apply(a: A): B = f(a) } - private def flattenFutureCs[T](f: Future[CompletionStage[T]]): CompletionStage[T] = - f.map(_.toScala)(ExecutionContexts.sameThreadExecutionContext) - .flatMap(identity)(ExecutionContexts.sameThreadExecutionContext) - .toJava + private def flattenCs[T](f: CompletionStage[_ <: CompletionStage[T]]): CompletionStage[T] = + f.thenCompose(new java.util.function.Function[CompletionStage[T], CompletionStage[T]] { + override def apply(t: CompletionStage[T]): CompletionStage[T] = t + }) - private def publisher()(implicit mat: ActorMaterializer, attr: Attributes) = + private def publisher(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Publisher] .map(_.publisher) .getOrElse(GrpcPublisherExt()(mat.system).publisher) - private def subscriber()(implicit mat: ActorMaterializer, attr: Attributes) = + private def subscriber(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Subscriber] .map(_.subscriber) diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala index bf8abf9c84..93ee154e9a 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala @@ -7,7 +7,6 @@ package akka.stream.alpakka.googlecloud.pubsub.grpc.scaladsl import akka.actor.Cancellable import akka.dispatch.ExecutionContexts import akka.stream.{ActorMaterializer, Attributes} -import akka.stream.alpakka.googlecloud.pubsub.grpc.impl.Setup import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.{Done, NotUsed} import com.google.pubsub.v1.pubsub._ @@ -27,10 +26,10 @@ object GooglePubSub { * @param parallelism controls how many messages can be in-flight at any given time */ def publish(parallelism: Int): Flow[PublishRequest, PublishResponse, NotUsed] = - Setup - .flow { implicit mat => implicit attr => + Flow + .setup { (mat, attr) => Flow[PublishRequest] - .mapAsyncUnordered(parallelism)(publisher().client.publish) + .mapAsyncUnordered(parallelism)(publisher(mat, attr).client.publish) } .mapMaterializedValue(_ => NotUsed) @@ -46,15 +45,15 @@ object GooglePubSub { request: StreamingPullRequest, pollInterval: FiniteDuration ): Source[ReceivedMessage, Future[Cancellable]] = - Setup - .source { implicit mat => implicit attr => + Source + .setup { (mat, attr) => val cancellable = Promise[Cancellable] val subsequentRequest = request .withSubscription("") .withStreamAckDeadlineSeconds(0) - subscriber().client + subscriber(mat, attr).client .streamingPull( Source .single(request) @@ -78,21 +77,21 @@ object GooglePubSub { * @param parallelism controls how many acknowledgements can be in-flight at any given time */ def acknowledge(parallelism: Int): Sink[AcknowledgeRequest, Future[Done]] = - Setup - .sink { implicit mat => implicit attr => + Sink + .setup { (mat, attr) => Flow[AcknowledgeRequest] - .mapAsyncUnordered(parallelism)(subscriber().client.acknowledge) + .mapAsyncUnordered(parallelism)(subscriber(mat, attr).client.acknowledge) .toMat(Sink.ignore)(Keep.right) } .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.sameThreadExecutionContext)) - private def publisher()(implicit mat: ActorMaterializer, attr: Attributes) = + private def publisher(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Publisher] .map(_.publisher) .getOrElse(GrpcPublisherExt()(mat.system).publisher) - private def subscriber()(implicit mat: ActorMaterializer, attr: Attributes) = + private def subscriber(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Subscriber] .map(_.subscriber) diff --git a/google-fcm/src/main/mima-filters/1.0.x.backwards.excludes b/google-fcm/src/main/mima-filters/1.0.x.backwards.excludes new file mode 100644 index 0000000000..3e3c3138c7 --- /dev/null +++ b/google-fcm/src/main/mima-filters/1.0.x.backwards.excludes @@ -0,0 +1,2 @@ +# Allow changes to impl +ProblemFilters.exclude[Problem]("akka.stream.alpakka.google.firebase.fcm.impl.*") diff --git a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala b/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala index 0f6816ae02..804e5ab296 100644 --- a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala +++ b/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala @@ -19,24 +19,26 @@ private[fcm] object FcmFlows { private[fcm] def fcmWithData[T](conf: FcmSettings, sender: FcmSender): Flow[(FcmNotification, T), (FcmResponse, T), NotUsed] = - Setup - .flow { implicit materializer => _ => + Flow + .setup { (materializer, _) => import materializer.executionContext val http = Http()(materializer.system) val session: GoogleSession = new GoogleSession(conf.clientEmail, conf.privateKey, new GoogleTokenApi(http)) Flow[(FcmNotification, T)] .mapAsync(conf.maxConcurrentConnections)( in => - session.getToken().flatMap { token => - sender.send(conf.projectId, token, http, FcmSend(conf.isTest, in._1)).zip(Future.successful(in._2)) + session.getToken()(materializer).flatMap { token => + sender + .send(conf.projectId, token, http, FcmSend(conf.isTest, in._1))(materializer) + .zip(Future.successful(in._2)) } ) } .mapMaterializedValue(_ => NotUsed) private[fcm] def fcm(conf: FcmSettings, sender: FcmSender): Flow[FcmNotification, FcmResponse, NotUsed] = - Setup - .flow { implicit materializer => _ => + Flow + .setup { (materializer, _) => import materializer.executionContext val http = Http()(materializer.system) val session: GoogleSession = new GoogleSession(conf.clientEmail, conf.privateKey, new GoogleTokenApi(http)) @@ -44,8 +46,8 @@ private[fcm] object FcmFlows { Flow[FcmNotification] .mapAsync(conf.maxConcurrentConnections)( in => - session.getToken().flatMap { token => - sender.send(conf.projectId, token, http, FcmSend(conf.isTest, in)) + session.getToken()(materializer).flatMap { token => + sender.send(conf.projectId, token, http, FcmSend(conf.isTest, in))(materializer) } ) } diff --git a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/SetupStage.scala b/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/SetupStage.scala deleted file mode 100644 index dc5279ae72..0000000000 --- a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/SetupStage.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.google.firebase.fcm.impl - -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -private object Setup { - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - -} diff --git a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala index 50b8a1305a..6474366a12 100644 --- a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala +++ b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala @@ -18,7 +18,7 @@ import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{verify, when} import org.scalatest.concurrent.ScalaFutures -import org.scalatest.mockito.MockitoSugar +import org.scalatestplus.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import scala.concurrent.duration._ diff --git a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala index b9cf18e78a..b3ee16735f 100644 --- a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala +++ b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala @@ -17,7 +17,7 @@ import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{verify, when} import org.scalatest.concurrent.ScalaFutures -import org.scalatest.mockito.MockitoSugar +import org.scalatestplus.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import pdi.jwt.{Jwt, JwtAlgorithm} diff --git a/hdfs/src/test/java/docs/javadsl/HdfsWriterTest.java b/hdfs/src/test/java/docs/javadsl/HdfsWriterTest.java index 929b43c3c5..5c547ecb27 100644 --- a/hdfs/src/test/java/docs/javadsl/HdfsWriterTest.java +++ b/hdfs/src/test/java/docs/javadsl/HdfsWriterTest.java @@ -282,8 +282,7 @@ public void testDataWriterKafkaExample() throws Exception { assertEquals(logs, expect); JavaTestUtils.verifyOutputFileSize(fs, logs); assertEquals( - JavaTestUtils.readLogs(fs, logs) - .stream() + JavaTestUtils.readLogs(fs, logs).stream() .map(string -> string.split("\n")) .flatMap(Arrays::stream) .collect(Collectors.toList()), diff --git a/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsAckConnectorsTest.java b/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsAckConnectorsTest.java index 714aef73ac..820c8b0bfc 100644 --- a/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsAckConnectorsTest.java +++ b/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsAckConnectorsTest.java @@ -172,8 +172,7 @@ public void publishAndConsumeJmsTextMessagesWithHeaders() throws Exception { JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test")); List msgsIn = - createTestMessageList() - .stream() + createTestMessageList().stream() .map(jmsTextMessage -> jmsTextMessage.withHeader(JmsType.create("type"))) .map( jmsTextMessage -> @@ -252,8 +251,7 @@ public void publishJmsTextMessagesWithPropertiesAndConsumeThemWithASelector() th .withSelector("IsOdd = TRUE")); List oddMsgsIn = - msgsIn - .stream() + msgsIn.stream() .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1) .collect(Collectors.toList()); assertEquals(5, oddMsgsIn.size()); diff --git a/jms/src/test/java/docs/javadsl/JmsBufferedAckConnectorsTest.java b/jms/src/test/java/docs/javadsl/JmsBufferedAckConnectorsTest.java index 59d2fc19a5..d3c0b3f281 100644 --- a/jms/src/test/java/docs/javadsl/JmsBufferedAckConnectorsTest.java +++ b/jms/src/test/java/docs/javadsl/JmsBufferedAckConnectorsTest.java @@ -178,8 +178,7 @@ public void publishAndConsumeJmsTextMessagesWithHeaders() throws Exception { JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test")); List msgsIn = - createTestMessageList() - .stream() + createTestMessageList().stream() .map(jmsTextMessage -> jmsTextMessage.withHeader(JmsType.create("type"))) .map( jmsTextMessage -> @@ -257,8 +256,7 @@ public void publishJmsTextMessagesWithPropertiesAndConsumeThemWithASelector() th .withSelector("IsOdd = TRUE")); List oddMsgsIn = - msgsIn - .stream() + msgsIn.stream() .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1) .collect(Collectors.toList()); assertEquals(5, oddMsgsIn.size()); diff --git a/jms/src/test/java/docs/javadsl/JmsConnectorsTest.java b/jms/src/test/java/docs/javadsl/JmsConnectorsTest.java index 8fd9333896..1721513def 100644 --- a/jms/src/test/java/docs/javadsl/JmsConnectorsTest.java +++ b/jms/src/test/java/docs/javadsl/JmsConnectorsTest.java @@ -336,8 +336,7 @@ public void publishAndConsumeJmsTextMessagesWithHeaders() throws Exception { // #create-messages-with-headers List msgsIn = - createTestMessageList() - .stream() + createTestMessageList().stream() .map( jmsTextMessage -> jmsTextMessage @@ -447,8 +446,7 @@ public void publishJmsTextMessagesWithPropertiesAndConsumeThemWithASelector() th // #source-with-selector List oddMsgsIn = - msgsIn - .stream() + msgsIn.stream() .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1) .collect(Collectors.toList()); assertEquals(5, oddMsgsIn.size()); @@ -648,10 +646,7 @@ public void browse() throws Exception { // #browse-source List resultText = - result - .toCompletableFuture() - .get() - .stream() + result.toCompletableFuture().get().stream() .map( message -> { try { diff --git a/jms/src/test/java/docs/javadsl/JmsTxConnectorsTest.java b/jms/src/test/java/docs/javadsl/JmsTxConnectorsTest.java index 63680af505..74cf42ed31 100644 --- a/jms/src/test/java/docs/javadsl/JmsTxConnectorsTest.java +++ b/jms/src/test/java/docs/javadsl/JmsTxConnectorsTest.java @@ -179,8 +179,7 @@ public void publishAndConsumeJmsTextMessagesWithHeaders() throws Exception { JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test")); List msgsIn = - createTestMessageList() - .stream() + createTestMessageList().stream() .map(jmsTextMessage -> jmsTextMessage.withHeader(JmsType.create("type"))) .map( jmsTextMessage -> @@ -257,8 +256,7 @@ public void publishJmsTextMessagesWithPropertiesAndConsumeThemWithASelector() th .withSelector("IsOdd = TRUE")); List oddMsgsIn = - msgsIn - .stream() + msgsIn.stream() .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1) .collect(Collectors.toList()); assertEquals(5, oddMsgsIn.size()); diff --git a/kudu/src/main/mima-filters/1.0.x.backwards.excludes b/kudu/src/main/mima-filters/1.0.x.backwards.excludes new file mode 100644 index 0000000000..b8c5819008 --- /dev/null +++ b/kudu/src/main/mima-filters/1.0.x.backwards.excludes @@ -0,0 +1,2 @@ +# Allow changes to impl +ProblemFilters.exclude[Problem]("akka.stream.alpakka.kudu.impl.*") diff --git a/kudu/src/main/scala/akka/stream/alpakka/kudu/impl/SetupStage.scala b/kudu/src/main/scala/akka/stream/alpakka/kudu/impl/SetupStage.scala deleted file mode 100644 index f0e8a2e9a9..0000000000 --- a/kudu/src/main/scala/akka/stream/alpakka/kudu/impl/SetupStage.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.kudu.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[kudu] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala b/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala index 1bddaf7e99..e73201801d 100644 --- a/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala +++ b/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.kudu.scaladsl import akka.stream.{ActorMaterializer, Attributes} import akka.stream.alpakka.kudu.{KuduAttributes, KuduClientExt, KuduTableSettings} -import akka.stream.alpakka.kudu.impl.{KuduFlowStage, Setup} +import akka.stream.alpakka.kudu.impl.KuduFlowStage import akka.stream.scaladsl.{Flow, Keep, Sink} import akka.{Done, NotUsed} @@ -27,13 +27,13 @@ object KuduTable { * Create a Flow writing elements to a Kudu table. */ def flow[A](settings: KuduTableSettings[A]): Flow[A, A, NotUsed] = - Setup - .flow { implicit mat => implicit attr => - Flow.fromGraph(new KuduFlowStage[A](settings, client())) + Flow + .setup { (mat, attr) => + Flow.fromGraph(new KuduFlowStage[A](settings, client(mat, attr))) } .mapMaterializedValue(_ => NotUsed) - private def client()(implicit mat: ActorMaterializer, attr: Attributes) = + private def client(mat: ActorMaterializer, attr: Attributes) = attr .get[KuduAttributes.Client] .map(_.client) diff --git a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java index f50c59e570..5550d21105 100644 --- a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java +++ b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java @@ -111,10 +111,7 @@ public void saveWithInsertOne() throws Exception { assertEquals( testRange, - found - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("value")) .collect(Collectors.toList())); } @@ -148,10 +145,7 @@ public void saveWithInsertMany() throws Exception { assertEquals( testRange, - found - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("value")) .collect(Collectors.toList())); } @@ -189,10 +183,7 @@ public void saveWithInsertManyWithOptions() throws Exception { assertEquals( testRange, - found - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("value")) .collect(Collectors.toList())); } @@ -236,10 +227,7 @@ public void updateWithUpdateOne() throws Exception { assertEquals( testRange.stream().map(i -> Pair.create(i, i * -1)).collect(Collectors.toList()), - found - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(d -> Pair.create(d.getInteger("value"), d.getInteger("updateValue"))) .collect(Collectors.toList())); } @@ -262,10 +250,7 @@ public void updateWithUpdateMany() throws Exception { assertEquals( testRange.stream().map(i -> Pair.create(i, 0)).collect(Collectors.toList()), - found - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(d -> Pair.create(d.getInteger("value"), d.getInteger("updateValue"))) .collect(Collectors.toList())); } diff --git a/mongodb/src/test/java/docs/javadsl/MongoSourceTest.java b/mongodb/src/test/java/docs/javadsl/MongoSourceTest.java index 5994882ec3..c6fb7d1364 100644 --- a/mongodb/src/test/java/docs/javadsl/MongoSourceTest.java +++ b/mongodb/src/test/java/docs/javadsl/MongoSourceTest.java @@ -93,9 +93,7 @@ public void streamTheResultOfASimpleMongoQuery() throws Exception { assertEquals( data, - rows.toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + rows.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("_id")) .collect(Collectors.toList())); } @@ -123,20 +121,12 @@ public void supportMultipleMaterializations() throws Exception { assertEquals( data, - source - .runWith(Sink.seq(), mat) - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + source.runWith(Sink.seq(), mat).toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("_id")) .collect(Collectors.toList())); assertEquals( data, - source - .runWith(Sink.seq(), mat) - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + source.runWith(Sink.seq(), mat).toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("_id")) .collect(Collectors.toList())); } diff --git a/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java b/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java index 2dbb757d02..5d64942f4b 100644 --- a/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java +++ b/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java @@ -128,11 +128,7 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { assertEquals( input, - unackedResult - .second() - .toCompletableFuture() - .get(5, TimeUnit.SECONDS) - .stream() + unackedResult.second().toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(m -> m.message().payload().utf8String()) .collect(Collectors.toList())); @@ -152,10 +148,7 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { assertEquals( input, - result - .toCompletableFuture() - .get(3, TimeUnit.SECONDS) - .stream() + result.toCompletableFuture().get(3, TimeUnit.SECONDS).stream() .map(m -> m.payload().utf8String()) .collect(Collectors.toList())); } diff --git a/orientdb/src/test/java/docs/javadsl/OrientDbTest.java b/orientdb/src/test/java/docs/javadsl/OrientDbTest.java index 72f8d53e11..b8849c06fd 100644 --- a/orientdb/src/test/java/docs/javadsl/OrientDbTest.java +++ b/orientdb/src/test/java/docs/javadsl/OrientDbTest.java @@ -346,8 +346,7 @@ public void accept(KafkaOffset kafkaOffset) { ODatabaseDocumentTx db = oDatabase.acquire(); db.setDatabaseOwner(new OObjectDatabaseTx(db)); ODatabaseRecordThreadLocal.instance().set(db); - messages - .stream() + messages.stream() .forEach( message -> { commitToKafka.accept(((KafkaOffset) message.passThrough())); @@ -369,8 +368,7 @@ public void accept(KafkaOffset kafkaOffset) { .get(10, TimeUnit.SECONDS); assertEquals( - messagesFromKafkas - .stream() + messagesFromKafkas.stream() .map(m -> m.getBook_title()) .sorted() .collect(Collectors.toList()), diff --git a/reference/src/main/scala/akka/stream/alpakka/reference/impl/SetupStage.scala b/reference/src/main/scala/akka/stream/alpakka/reference/impl/SetupStage.scala deleted file mode 100644 index 214243b34e..0000000000 --- a/reference/src/main/scala/akka/stream/alpakka/reference/impl/SetupStage.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.reference.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -// This will be removed from Alpakka project once it is merged to Akka. -// https://github.com/akka/akka/pull/26477 - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[reference] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala b/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala index 1bda8f0c64..ef707b85b1 100644 --- a/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala +++ b/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala @@ -7,7 +7,7 @@ package akka.stream.alpakka.reference.scaladsl import akka.actor.ActorSystem import akka.stream.Attributes import akka.{Done, NotUsed} -import akka.stream.alpakka.reference.impl.{ReferenceFlow, ReferenceSource, ReferenceWithResourceFlow, Setup} +import akka.stream.alpakka.reference.impl.{ReferenceFlow, ReferenceSource, ReferenceWithResourceFlow} import akka.stream.alpakka.reference._ import akka.stream.scaladsl.{Flow, Source} @@ -40,13 +40,12 @@ object Reference { * An implementation of a flow that needs access to materializer or attributes during materialization. */ def flowWithResource(): Flow[ReferenceWriteMessage, ReferenceWriteResult, NotUsed] = - Setup - .flow { mat => implicit attr => - implicit val sys = mat.system - Flow.fromGraph(new ReferenceWithResourceFlow(resolveResource())) + Flow + .setup { (mat, attr) => + Flow.fromGraph(new ReferenceWithResourceFlow(resolveResource(mat.system, attr))) } .mapMaterializedValue(_ => NotUsed) - private def resolveResource()(implicit sys: ActorSystem, attr: Attributes) = - attr.get[ReferenceResourceValue].map(_.resource).getOrElse(ResourceExt().resource) + private def resolveResource(sys: ActorSystem, attr: Attributes) = + attr.get[ReferenceResourceValue].map(_.resource).getOrElse(ResourceExt()(sys).resource) } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index f75040eb5b..ec2f7359d6 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -111,18 +111,20 @@ import akka.util.ByteString ): Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed] = { val s3Headers = sse.toIndexedSeq.flatMap(_.headersFor(GetObject)) - Setup.source { implicit mat => _ => - request(s3Location, rangeOption = range, versionId = versionId, s3Headers = s3Headers) - .map(response => response.withEntity(response.entity.withoutSizeLimit)) - .mapAsync(parallelism = 1)(entityForSuccess) - .map { - case (entity, headers) => - Option((entity.dataBytes.mapMaterializedValue(_ => NotUsed), computeMetaData(headers, entity))) - } - .recover[Option[(Source[ByteString, NotUsed], ObjectMetadata)]] { - case e: S3Exception if e.code == "NoSuchKey" => None - } - } + Source + .setup { (mat, attr) => + implicit val materializer = mat + request(s3Location, rangeOption = range, versionId = versionId, s3Headers = s3Headers) + .map(response => response.withEntity(response.entity.withoutSizeLimit)) + .mapAsync(parallelism = 1)(entityForSuccess) + .map { + case (entity, headers) => + Option((entity.dataBytes.mapMaterializedValue(_ => NotUsed), computeMetaData(headers, entity))) + } + .recover[Option[(Source[ByteString, NotUsed], ObjectMetadata)]] { + case e: S3Exception if e.code == "NoSuchKey" => None + } + } }.mapMaterializedValue(_ => NotUsed) def listBucket(bucket: String, prefix: Option[String] = None): Source[ListBucketResultContents, NotUsed] = { @@ -136,8 +138,7 @@ import akka.util.ByteString )(implicit mat: ActorMaterializer, attr: Attributes): Future[Option[(ListBucketState, Seq[ListBucketResultContents])]] = { import mat.executionContext - implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) signAndGetAs[ListBucketResult](HttpRequests.listBucket(bucket, prefix, token)) .map { (res: ListBucketResult) => @@ -150,8 +151,10 @@ import akka.util.ByteString } } - Setup - .source { implicit mat => implicit attr => + Source + .setup { (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr Source .unfoldAsync[ListBucketState, Seq[ListBucketResultContents]](Starting) { case Finished => Future.successful(None) @@ -167,32 +170,35 @@ import akka.util.ByteString key: String, versionId: Option[String], sse: Option[ServerSideEncryption]): Source[Option[ObjectMetadata], NotUsed] = - Setup - .source { implicit mat => _ => + Source + .setup { (mat, attr) => + implicit val materializer = mat import mat.executionContext val s3Headers = sse.toIndexedSeq.flatMap(_.headersFor(HeadObject)) - request(S3Location(bucket, key), HttpMethods.HEAD, versionId = versionId, s3Headers = s3Headers).flatMapConcat { - case HttpResponse(OK, headers, entity, _) => - Source.fromFuture { - entity.withoutSizeLimit().discardBytes().future().map { _ => - Some(computeMetaData(headers, entity)) + request(S3Location(bucket, key), HttpMethods.HEAD, versionId = versionId, s3Headers = s3Headers) + .flatMapConcat { + case HttpResponse(OK, headers, entity, _) => + Source.fromFuture { + entity.withoutSizeLimit().discardBytes().future().map { _ => + Some(computeMetaData(headers, entity)) + } } - } - case HttpResponse(NotFound, _, entity, _) => - Source.fromFuture(entity.discardBytes().future().map(_ => None)) - case HttpResponse(code, _, entity, _) => - Source.fromFuture { - Unmarshal(entity).to[String].map { err => - throw new S3Exception(err, code) + case HttpResponse(NotFound, _, entity, _) => + Source.fromFuture(entity.discardBytes().future().map(_ => None)) + case HttpResponse(code, _, entity, _) => + Source.fromFuture { + Unmarshal(entity).to[String].map { err => + throw new S3Exception(err, code) + } } - } - } + } } .mapMaterializedValue(_ => NotUsed) def deleteObject(s3Location: S3Location, versionId: Option[String]): Source[Done, NotUsed] = - Setup - .source { implicit mat => _ => + Source + .setup { (mat, attr) => + implicit val m = mat import mat.executionContext request(s3Location, HttpMethods.DELETE, versionId = versionId).flatMapConcat { case HttpResponse(NoContent, _, entity, _) => @@ -224,11 +230,13 @@ import akka.util.ByteString val headers = s3Headers.headersFor(PutObject) - Setup - .source { implicit mat => implicit attr => + Source + .setup { (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr import mat.executionContext implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) val req = uploadRequest(s3Location, data, contentLength, contentType, headers) @@ -256,10 +264,11 @@ import akka.util.ByteString rangeOption: Option[ByteRange] = None, versionId: Option[String] = None, s3Headers: Seq[HttpHeader] = Seq.empty): Source[HttpResponse, NotUsed] = - Setup - .source { mat => implicit attr => + Source + .setup { (mat, attr) => + implicit val attributes = attr implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) signAndRequest(requestHeaders(getDownloadRequest(s3Location, method, s3Headers, versionId), rangeOption)) } .mapMaterializedValue(_ => NotUsed) @@ -305,10 +314,11 @@ import akka.util.ByteString method: HttpMethod, process: (HttpResponse, Materializer) => Future[T] ): Source[T, NotUsed] = - Setup - .source { mat => implicit attr => + Source + .setup { (mat, attr) => + implicit val attributes = attr implicit val sys: ActorSystem = mat.system - implicit val conf: S3Settings = resolveSettings() + implicit val conf: S3Settings = resolveSettings(attr, mat.system) val location = S3Location(bucket = bucket, key = "") @@ -380,11 +390,13 @@ import akka.util.ByteString private def initiateMultipartUpload(s3Location: S3Location, contentType: ContentType, s3Headers: Seq[HttpHeader]): Source[MultipartUpload, NotUsed] = - Setup - .source { implicit mat => implicit attr => + Source + .setup { (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr import mat.executionContext implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) val req = initiateMultipartUploadRequest(s3Location, contentType, s3Headers) @@ -463,8 +475,7 @@ import akka.util.ByteString } import mat.executionContext - implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) val headers = sse.toIndexedSeq.flatMap(_.headersFor(UploadPart)) @@ -510,10 +521,9 @@ import akka.util.ByteString val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(UploadPart)) - Setup - .flow { mat => implicit attr => - implicit val sys = mat.system - implicit val conf = resolveSettings() + Flow + .setup { (mat, attr) => + implicit val conf = resolveSettings(attr, mat.system) SplitAfterSize(chunkSize, MaxChunkSize)(atLeastOneByteString) .via(getChunkBuffer(chunkSize)) //creates the chunks @@ -570,11 +580,12 @@ import akka.util.ByteString val requestFlow = createRequests(s3Location, contentType, s3Headers, chunkSize, parallelism) // The individual upload part requests are processed here - Setup - .flow { implicit mat => implicit attr => + Flow + .setup { (mat, attr) => + implicit val materializer = mat import mat.executionContext implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) requestFlow .via(superPool[(MultipartUpload, Int)]) @@ -611,8 +622,10 @@ import akka.util.ByteString s3Location: S3Location, sse: Option[ServerSideEncryption] ): Sink[UploadPartResponse, Future[MultipartUploadResult]] = - Setup - .sink { implicit mat => implicit attr => + Sink + .setup { (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr val sys = mat.system import sys.dispatcher Sink @@ -666,7 +679,7 @@ import akka.util.ByteString request: HttpRequest, retries: Int = 3 )(implicit sys: ActorSystem, attr: Attributes): Source[HttpResponse, NotUsed] = { - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, sys) Signer .signedRequest(request, signingKey) @@ -717,10 +730,9 @@ import akka.util.ByteString val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(CopyPart)) - Setup - .source { mat => implicit attr => - implicit val sys = mat.system - implicit val conf = resolveSettings() + Source + .setup { (mat, attr) => + implicit val conf = resolveSettings(attr, mat.system) requestInfo .zipWith(partitions) { @@ -742,38 +754,41 @@ import akka.util.ByteString private def processUploadCopyPartRequests( requests: Source[(HttpRequest, MultipartCopy), NotUsed] )(parallelism: Int) = - Setup.source { implicit mat => implicit attr => - import mat.executionContext - implicit val sys = mat.system - implicit val settings = resolveSettings() - - requests - .via(superPool[MultipartCopy]) - .map { - case (Success(r), multipartCopy) => - val entity = r.entity - val upload = multipartCopy.multipartUpload - val index = multipartCopy.copyPartition.partNumber - import StatusCodes._ - r.status match { - case OK => - Unmarshal(entity).to[CopyPartResult].map(cp => SuccessfulUploadPart(upload, index, cp.eTag)) - case statusCode: StatusCode => - Unmarshal(entity).to[String].map { err => - val response = Option(err).getOrElse(s"Failed to upload part into S3, status code was: $statusCode") - throw new S3Exception(response, statusCode) - } - } + Source + .setup { (mat, attr) => + implicit val materializer = mat + import mat.executionContext + implicit val sys = mat.system + implicit val settings = resolveSettings(attr, mat.system) + + requests + .via(superPool[MultipartCopy]) + .map { + case (Success(r), multipartCopy) => + val entity = r.entity + val upload = multipartCopy.multipartUpload + val index = multipartCopy.copyPartition.partNumber + import StatusCodes._ + r.status match { + case OK => + Unmarshal(entity).to[CopyPartResult].map(cp => SuccessfulUploadPart(upload, index, cp.eTag)) + case statusCode: StatusCode => + Unmarshal(entity).to[String].map { err => + val response = + Option(err).getOrElse(s"Failed to upload part into S3, status code was: $statusCode") + throw new S3Exception(response, statusCode) + } + } - case (Failure(ex), multipartCopy) => - Future.successful( - FailedUploadPart(multipartCopy.multipartUpload, multipartCopy.copyPartition.partNumber, ex) - ) - } - .mapAsync(parallelism)(identity) - } + case (Failure(ex), multipartCopy) => + Future.successful( + FailedUploadPart(multipartCopy.multipartUpload, multipartCopy.copyPartition.partNumber, ex) + ) + } + .mapAsync(parallelism)(identity) + } - private def resolveSettings()(implicit attr: Attributes, sys: ActorSystem) = + private def resolveSettings(attr: Attributes, sys: ActorSystem) = attr .get[S3SettingsValue] .map(_.settings) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/SetupStage.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/SetupStage.scala deleted file mode 100644 index 9142e46fb8..0000000000 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/SetupStage.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.s3.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[impl] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java index 380c04ecc8..2690004026 100644 --- a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java +++ b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java @@ -67,8 +67,7 @@ public static void main(String[] args) throws Exception { .collect(Collectors.toList()); List> messagesFromKafka = - users - .stream() + users.stream() .map(user -> new KafkaMessage<>(user, new CommittableOffset(users.indexOf(user)))) .collect(Collectors.toList()); diff --git a/slick/src/test/java/docs/javadsl/SlickTest.java b/slick/src/test/java/docs/javadsl/SlickTest.java index de178b8e45..738265b87b 100644 --- a/slick/src/test/java/docs/javadsl/SlickTest.java +++ b/slick/src/test/java/docs/javadsl/SlickTest.java @@ -226,8 +226,7 @@ public void testFlowWithPassThroughKafkaExample() throws Exception { final List usersList = new ArrayList<>(SlickTest.users); final List> messagesFromKafka = - usersList - .stream() + usersList.stream() .map(user -> new KafkaMessage<>(user, new KafkaOffset(usersList.indexOf(user)))) .collect(Collectors.toList()); diff --git a/text/src/test/java/docs/javadsl/CharsetCodingFlowsDoc.java b/text/src/test/java/docs/javadsl/CharsetCodingFlowsDoc.java index 51a1daba48..6ca3f2063f 100644 --- a/text/src/test/java/docs/javadsl/CharsetCodingFlowsDoc.java +++ b/text/src/test/java/docs/javadsl/CharsetCodingFlowsDoc.java @@ -50,9 +50,7 @@ public void encodingExample() throws Exception { Properties properties = System.getProperties(); List strings = - properties - .stringPropertyNames() - .stream() + properties.stringPropertyNames().stream() .map(p -> p + " -> " + properties.getProperty(p)) .collect(Collectors.toList()); // #encoding