From 43d4737902c17adc302984ef778288d9a5834dbc Mon Sep 17 00:00:00 2001 From: kerr Date: Wed, 16 Nov 2022 11:22:52 +0800 Subject: [PATCH] =str Avoid sub materialization in lazySingle. --- .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../stream/impl/fusing/LazySingleSource.scala | 51 +++++++++++++++++++ .../apache/pekko/stream/javadsl/Source.scala | 2 +- .../apache/pekko/stream/scaladsl/Source.scala | 4 +- 4 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index aa6ef2c1ca1..487ae429902 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -115,6 +115,7 @@ import pekko.stream.Attributes._ val futureFlattenSource = name("futureFlattenSource") val tickSource = name("tickSource") val singleSource = name("singleSource") + val lazySingleSource = name("lazySingleSource") val emptySource = name("emptySource") val maybeSource = name("MaybeSource") val neverSource = name("neverSource") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala new file mode 100644 index 00000000000..26757b3b88e --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazySingleSource.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.stream.Attributes +import pekko.stream.Attributes.SourceLocation +import pekko.stream.Outlet +import pekko.stream.SourceShape +import pekko.stream.impl.ReactiveStreamsCompliance +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.GraphStage +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.OutHandler + +private[pekko] final class LazySingleSource[T](f: () => T) extends GraphStage[SourceShape[T]] { + require(f != null, "f should not be null.") + private val out = Outlet[T]("LazySingleSource.out") + override def initialAttributes: Attributes = DefaultAttributes.lazySingleSource and + SourceLocation.forLambda(f) + + val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + override def onPull(): Unit = { + setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more + val elem = f() + ReactiveStreamsCompliance.requireNonNullElement(elem) + push(out, elem) + completeStage() + } + + setHandler(out, this) + } + override def toString: String = "LazySingleSource" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index d674c1a56fb..f27105d3d83 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -348,7 +348,7 @@ object Source { * is failed with a [[pekko.stream.NeverMaterializedException]] */ def lazySingle[T](create: Creator[T]): Source[T, NotUsed] = - lazySource(() => single(create.create())).mapMaterializedValue(_ => NotUsed) + new Source(scaladsl.Source.lazySingle(() => create.create())) /** * Defers invoking the `create` function to create a future element until there is downstream demand. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 7abf2693963..d03c7cc39c4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -30,7 +30,7 @@ import pekko.annotation.InternalApi import pekko.stream.{ Outlet, SourceShape, _ } import pekko.stream.impl.{ PublisherSource, _ } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages +import pekko.stream.impl.fusing.{ GraphStages, LazySingleSource } import pekko.stream.impl.fusing.GraphStages._ import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.ConstantFun @@ -544,7 +544,7 @@ object Source { * the laziness and will trigger the factory immediately. */ def lazySingle[T](create: () => T): Source[T, NotUsed] = - lazySource(() => single(create())).mapMaterializedValue(_ => NotUsed) + fromGraph(new LazySingleSource(create)) /** * Defers invoking the `create` function to create a future element until there is downstream demand.