From 90d646d84681304c44e559accf2e6a383e3584e6 Mon Sep 17 00:00:00 2001 From: kerr Date: Wed, 16 Nov 2022 11:22:52 +0800 Subject: [PATCH] =str Avoid sub materialization in lazySingle. --- .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../stream/impl/fusing/LazySingleSource.scala | 50 +++++++++++++++++++ .../apache/pekko/stream/javadsl/Source.scala | 5 +- .../apache/pekko/stream/scaladsl/Source.scala | 7 ++- 4 files changed, 56 insertions(+), 7 deletions(-) create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 82e53391d43..7cdf147687c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -106,6 +106,7 @@ import pekko.stream.Attributes._ val futureFlattenSource = name("futureFlattenSource") val tickSource = name("tickSource") val singleSource = name("singleSource") + val lazySingleSource = name("lazySingleSource") val emptySource = name("emptySource") val maybeSource = name("MaybeSource") val neverSource = name("neverSource") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala new file mode 100644 index 00000000000..18ef064c960 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala @@ -0,0 +1,50 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.apache.pekko.stream.impl.fusing +import org.apache.pekko +import pekko.stream.Attributes +import pekko.stream.Attributes.SourceLocation +import pekko.stream.Outlet +import pekko.stream.SourceShape +import pekko.stream.impl.ReactiveStreamsCompliance +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.GraphStage +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.OutHandler +private[pekko] final class LazySingleSource[T](f: () => T) extends GraphStage[SourceShape[T]] { + require(f != null, "f should not be null.") + private val out = Outlet[T]("LazySingleSource.out") + override def initialAttributes: Attributes = DefaultAttributes.lazySingleSource and + SourceLocation.forLambda(f) + + val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + override def onPull(): Unit = { + val elem = f() + ReactiveStreamsCompliance.requireNonNullElement(elem) + push(out, elem) + completeStage() + } + + setHandler(out, this) + } + override def toString: String = "LazySingleSource" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 928e5126e8d..7027949e885 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -18,9 +18,8 @@ import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import org.reactivestreams.{ Publisher, Subscriber } - import org.apache.pekko +import org.reactivestreams.{ Publisher, Subscriber } import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import pekko.annotation.ApiMayChange @@ -340,7 +339,7 @@ object Source { * is failed with a [[pekko.stream.NeverMaterializedException]] */ def lazySingle[T](create: Creator[T]): Source[T, NotUsed] = - lazySource(() => single(create.create())).mapMaterializedValue(_ => NotUsed) + new Source(scaladsl.Source.lazySingle(() => create.create())) /** * Defers invoking the `create` function to create a future element until there is downstream demand. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 85ab5c7ea63..9ef8482a435 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -13,16 +13,15 @@ import scala.compat.java8.FutureConverters._ import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration -import org.reactivestreams.{ Publisher, Subscriber } - import org.apache.pekko +import org.reactivestreams.{ Publisher, Subscriber } import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable } import pekko.annotation.InternalApi import pekko.stream.{ Outlet, SourceShape, _ } import pekko.stream.impl.{ PublisherSource, _ } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages +import pekko.stream.impl.fusing.{ GraphStages, LazySingleSource } import pekko.stream.impl.fusing.GraphStages._ import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.ConstantFun @@ -535,7 +534,7 @@ object Source { * the laziness and will trigger the factory immediately. */ def lazySingle[T](create: () => T): Source[T, NotUsed] = - lazySource(() => single(create())).mapMaterializedValue(_ => NotUsed) + fromGraph(new LazySingleSource(create)) /** * Defers invoking the `create` function to create a future element until there is downstream demand.