Skip to content

Commit

Permalink
=str Avoid sub materialization in lazySingle.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 8, 2023
1 parent f48a889 commit 90d646d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ import pekko.stream.Attributes._
val futureFlattenSource = name("futureFlattenSource")
val tickSource = name("tickSource")
val singleSource = name("singleSource")
val lazySingleSource = name("lazySingleSource")
val emptySource = name("emptySource")
val maybeSource = name("MaybeSource")
val neverSource = name("neverSource")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/

package org.apache.pekko.stream.impl.fusing
import org.apache.pekko
import pekko.stream.Attributes
import pekko.stream.Attributes.SourceLocation
import pekko.stream.Outlet
import pekko.stream.SourceShape
import pekko.stream.impl.ReactiveStreamsCompliance
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.OutHandler
private[pekko] final class LazySingleSource[T](f: () => T) extends GraphStage[SourceShape[T]] {
require(f != null, "f should not be null.")
private val out = Outlet[T]("LazySingleSource.out")
override def initialAttributes: Attributes = DefaultAttributes.lazySingleSource and
SourceLocation.forLambda(f)

val shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
override def onPull(): Unit = {
val elem = f()
ReactiveStreamsCompliance.requireNonNullElement(elem)
push(out, elem)
completeStage()
}

setHandler(out, this)
}
override def toString: String = "LazySingleSource"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

import org.reactivestreams.{ Publisher, Subscriber }

import org.apache.pekko
import org.reactivestreams.{ Publisher, Subscriber }
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import pekko.annotation.ApiMayChange
Expand Down Expand Up @@ -340,7 +339,7 @@ object Source {
* is failed with a [[pekko.stream.NeverMaterializedException]]
*/
def lazySingle[T](create: Creator[T]): Source[T, NotUsed] =
lazySource(() => single(create.create())).mapMaterializedValue(_ => NotUsed)
new Source(scaladsl.Source.lazySingle(() => create.create()))

/**
* Defers invoking the `create` function to create a future element until there is downstream demand.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ import scala.compat.java8.FutureConverters._
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration

import org.reactivestreams.{ Publisher, Subscriber }

import org.apache.pekko
import org.reactivestreams.{ Publisher, Subscriber }
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable }
import pekko.annotation.InternalApi
import pekko.stream.{ Outlet, SourceShape, _ }
import pekko.stream.impl.{ PublisherSource, _ }
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.GraphStages
import pekko.stream.impl.fusing.{ GraphStages, LazySingleSource }
import pekko.stream.impl.fusing.GraphStages._
import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.util.ConstantFun
Expand Down Expand Up @@ -535,7 +534,7 @@ object Source {
* the laziness and will trigger the factory immediately.
*/
def lazySingle[T](create: () => T): Source[T, NotUsed] =
lazySource(() => single(create())).mapMaterializedValue(_ => NotUsed)
fromGraph(new LazySingleSource(create))

/**
* Defers invoking the `create` function to create a future element until there is downstream demand.
Expand Down

0 comments on commit 90d646d

Please sign in to comment.