Skip to content

Commit

Permalink
Scala 3.3.1 and improve Scala 3 compatibility (#3010)
Browse files Browse the repository at this point in the history
* Add type annotations
* bump Scala 3.3.1
  • Loading branch information
Arikuti authored Sep 18, 2023
1 parent 1331be7 commit ca9af1b
Show file tree
Hide file tree
Showing 87 changed files with 367 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ class AmqpGraphStageLogicConnectionShutdownSpec
with BeforeAndAfterEach
with LogCapturing {

override implicit val patienceConfig = PatienceConfig(10.seconds)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic

val shutdownsAdded = new AtomicInteger()
val shutdownsRemoved = new AtomicInteger()

override def beforeEach() = {
override def beforeEach(): Unit = {
shutdownsAdded.set(0)
shutdownsRemoved.set(0)
}
Expand All @@ -65,7 +65,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec
"registers and unregisters a single connection shutdown hook per graph" in {
// actor system is within this test as it has to be shut down in order
// to verify graph stage termination
implicit val system = ActorSystem(this.getClass.getSimpleName + System.currentTimeMillis())
implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName + System.currentTimeMillis())

val connectionFactory = new ConnectionFactory() {
override def newConnection(es: ExecutorService, ar: AddressResolver, name: String): Connection =
Expand Down
4 changes: 2 additions & 2 deletions amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import scala.collection.immutable
*/
class AmqpDocsSpec extends AmqpSpec {

override implicit val patienceConfig = PatienceConfig(10.seconds)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)

val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_)
val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful

"The AMQP Connectors" should {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package docs.scaladsl

import java.util.concurrent.CompletableFuture
import akka.actor.ActorSystem
import akka.stream.alpakka.awslambda.scaladsl.AwsLambdaFlow
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
Expand All @@ -16,16 +15,17 @@ import org.mockito.ArgumentMatchers.{any => mockitoAny, eq => mockitoEq}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.lambda.LambdaAsyncClient
import software.amazon.awssdk.services.lambda.model.{InvokeRequest, InvokeResponse}

import scala.concurrent.{Await, ExecutionContext}
import java.util.concurrent.CompletableFuture
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}

class AwsLambdaFlowSpec
extends TestKit(ActorSystem("AwsLambdaFlowSpec"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import scala.collection.mutable.Queue
setHandler(
out,
new OutHandler {
override def onPull: Unit =
override def onPull(): Unit =
if (!buffer.isEmpty) {
push(out, buffer.dequeue())
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.stream.alpakka.cassandra.scaladsl

import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.testkit.TestKitBase
import com.datastax.oss.driver.api.core.cql._
Expand All @@ -16,7 +15,7 @@ import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures}
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.compat.java8.FutureConverters._

Expand Down Expand Up @@ -63,7 +62,7 @@ trait CassandraLifecycleBase {
executeCql(lifecycleSession, statements.asScala.toList).toJava

def withSchemaMetadataDisabled(block: => Future[Done]): Future[Done] = {
implicit val ec = lifecycleSession.ec
implicit val ec: ExecutionContext = lifecycleSession.ec
lifecycleSession.underlying().flatMap { cqlSession =>
cqlSession.setSchemaMetadataEnabled(false)
val blockResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class AkkaDiscoverySpec extends CassandraSpecBase(ActorSystem("AkkaDiscoverySpec
"show referencing config in docs" in {
// #discovery
val sessionSettings = CassandraSessionSettings("example-with-akka-discovery")
implicit val session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings)
implicit val session: CassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings)
// #discovery
session.close(system.dispatcher)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.scalatest.wordspec.AnyWordSpec

class PassThroughExamples extends AnyWordSpec with BeforeAndAfterAll with Matchers with ScalaFutures {

implicit val system = ActorSystem("Test")
implicit val system: ActorSystem = ActorSystem("Test")

"PassThroughFlow" should {
" original message is maintained " in {
Expand Down Expand Up @@ -93,7 +93,7 @@ object PassThroughFlow {
//#PassThrough

object PassThroughFlowKafkaCommitExample {
implicit val system = ActorSystem("Test")
implicit val system: ActorSystem = ActorSystem("Test")

def dummy(): Unit = {
// #passThroughKafkaFlow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.annotation.{ApiMayChange, InternalApi}
import akka.http.scaladsl.{Http, HttpExt}
import akka.stream.alpakka.elasticsearch.{impl, _}
import akka.stream.alpakka.elasticsearch._
import akka.stream.scaladsl.{Flow, FlowWithContext, RetryFlow}
import spray.json._

import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.ExecutionContext

/**
* Scala API to create Elasticsearch flows.
Expand Down Expand Up @@ -210,7 +210,7 @@ object ElasticsearchFlow {
.fromMaterializer { (mat, _) =>
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
implicit val ec: ExecutionContextExecutor = mat.executionContext
implicit val ec: ExecutionContext = mat.executionContext

Flow.fromGraph {
new impl.ElasticsearchSimpleFlowStage[T, C](elasticsearchParams, settings, writer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package akka.stream.alpakka.elasticsearch.scaladsl
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.{Http, HttpExt}
import akka.stream.alpakka.elasticsearch.{impl, _}
import akka.stream.alpakka.elasticsearch._
import akka.stream.scaladsl.Source
import spray.json._

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.ExecutionContext

/**
* Scala API to create Elasticsearch sources.
Expand Down Expand Up @@ -67,7 +67,7 @@ object ElasticsearchSource {
.fromMaterializer { (mat, _) =>
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
implicit val ec: ExecutionContextExecutor = mat.executionContext
implicit val ec: ExecutionContext = mat.executionContext

val sourceStage = new impl.ElasticsearchSourceStage(
elasticsearchParams,
Expand Down Expand Up @@ -106,7 +106,7 @@ object ElasticsearchSource {
.fromMaterializer { (mat, _) =>
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
implicit val ec: ExecutionContextExecutor = mat.executionContext
implicit val ec: ExecutionContext = mat.executionContext

Source.fromGraph(
new impl.ElasticsearchSourceStage(elasticsearchParams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSett
val ftpClient: () => FtpClient
) extends GraphStageLogic(shape) {

protected[this] implicit val client = ftpClient()
protected[this] implicit val client: FtpClient = ftpClient()
protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler]
protected[this] var failed = false

Expand Down
9 changes: 6 additions & 3 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] =>
@InternalApi
object FtpLike {
// type class instances
implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations
implicit val ftpsLikeInstance = new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations
implicit val sFtpLikeInstance =
implicit val ftpLikeInstance: FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations =
new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations
implicit val ftpsLikeInstance: FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations =
new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations
implicit val sFtpLikeInstance
: FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads =
new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.ftp;
Expand Down
29 changes: 15 additions & 14 deletions ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,25 @@

package akka.stream.alpakka.ftp

import java.io.IOException
import java.net.InetAddress
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{Files, Paths}
import java.time.Instant
import java.util.concurrent.TimeUnit
import akka.stream.{IOOperationIncompleteException, IOResult}
import BaseSftpSupport.{CLIENT_PRIVATE_KEY_PASSPHRASE => ClientPrivateKeyPassphrase}
import akka.actor.ActorSystem
import akka.stream.alpakka.ftp.BaseSftpSupport.{CLIENT_PRIVATE_KEY_PASSPHRASE => ClientPrivateKeyPassphrase}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{IOOperationIncompleteException, IOResult, Materializer}
import akka.util.ByteString
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Millis, Seconds, Span}

import java.io.IOException
import java.net.InetAddress
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{Files, Paths}
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.collection.immutable
import scala.concurrent.{Await, ExecutionContextExecutor}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Random

final class FtpStageSpec extends BaseFtpSpec with CommonFtpStageSpec
Expand Down Expand Up @@ -84,14 +85,14 @@ final class UnconfirmedReadsSftpSourceSpec extends BaseSftpSpec with CommonFtpSt

trait CommonFtpStageSpec extends BaseSpec with Eventually {

implicit val system = getSystem
implicit val mat = getMaterializer
implicit val defaultPatience =
implicit val system: ActorSystem = getSystem
implicit val mat: Materializer = getMaterializer
implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = Span(30, Seconds), interval = Span(600, Millis))

"FtpBrowserSource" should {
"complete with a failed Future, when the credentials supplied were wrong" in assertAllStagesStopped {
implicit val ec = system.getDispatcher
implicit val ec: ExecutionContext = system.getDispatcher
listFilesWithWrongCredentials("")
.toMat(Sink.seq)(Keep.right)
.run()
Expand Down Expand Up @@ -494,7 +495,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
val source = mkdir(basePath, name)
val innerSource = mkdir(innerDirPath, innerDirName)

implicit val ec: ExecutionContextExecutor = mat.executionContext
implicit val ec: ExecutionContext = mat.executionContext

val res = for {
x <- source.runWith(Sink.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object PdxDecoder {

private def instance[A](f: (PdxReader, Symbol) => Try[A]): PdxDecoder[A] =
new PdxDecoder[A] {
def decode(reader: PdxReader, fieldName: Symbol) = f(reader, fieldName)
def decode(reader: PdxReader, fieldName: Symbol): Try[A] = f(reader, fieldName)
}

implicit val hnilDecoder: PdxDecoder[HNil] = instance((_, _) => Success(HNil))
Expand Down Expand Up @@ -145,14 +145,13 @@ object PdxDecoder {
hDecoder: Lazy[PdxDecoder[H]],
tDecoder: Lazy[PdxDecoder[T]]
): PdxDecoder[FieldType[K, H] :: T] = instance {
case (reader, fieldName) => {
case (reader, fieldName) =>
val headField = hDecoder.value.decode(reader, witness.value)
val tailFields = tDecoder.value.decode(reader, fieldName)
(headField, tailFields) match {
case (Success(h), Success(t)) => Success(field[K](h) :: t)
case _ => Failure(null)
}
}
case e => Failure(null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ object PdxMocks {
override def writeByte(fieldName: String, value: Byte) = { println(s"Write $value"); this }
}

implicit val writerMock = new WriterMock()
implicit val writerMock: WriterMock = new WriterMock()
}
2 changes: 1 addition & 1 deletion geode/src/test/scala/docs/scaladsl/GeodeBaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec

class GeodeBaseSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with LogCapturing {

implicit val system = ActorSystem("test")
implicit val system: ActorSystem = ActorSystem("test")

//#region
val personsRegionSettings: RegionSettings[Int, Person] = RegionSettings("persons", (p: Person) => p.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import java.util.concurrent.Executor
.fold(settings.withTls(false))(_ => settings.withTls(true))

val setCallCredentials = (settings: GrpcClientSettings) => {
implicit val config = system.classicSystem.settings.config
implicit val config: Config = system.classicSystem.settings.config
val executor: Executor = system.classicSystem.dispatcher
settings.withCallCredentials(MoreCallCredentials.from(credentials().asGoogle(executor, requestSettings())))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ import akka.util.ByteString
import com.google.cloud.bigquery.storage.v1.DataFormat
import com.google.cloud.bigquery.storage.v1.storage.{BigQueryReadClient, CreateReadSessionRequest, ReadRowsResponse}
import com.google.cloud.bigquery.storage.v1.stream.ReadSession.TableReadOptions
import com.google.cloud.bigquery.storage.v1.stream.ReadSession
import com.google.cloud.bigquery.storage.v1.stream.{ReadSession, DataFormat => StreamDataFormat}

import com.google.cloud.bigquery.storage.v1.stream.{DataFormat => StreamDataFormat}

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.{ExecutionContext, Future}

/**
* Google BigQuery Storage Api Akka Stream operator factory.
Expand Down Expand Up @@ -54,7 +52,7 @@ object BigQueryStorage {
Source.fromMaterializer { (mat, attr) =>
{
implicit val materializer: Materializer = mat
implicit val executionContext: ExecutionContextExecutor = materializer.executionContext
implicit val executionContext: ExecutionContext = materializer.executionContext
val client = reader(mat.system, attr).client
readSession(client, projectId, datasetId, tableId, dataFormat, readOptions, maxNumStreams)
.map { session =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object GrpcBigQueryStorageReader {
* An extension that manages a single gRPC scala reader client per actor system.
*/
final class GrpcBigQueryStorageReaderExt private (sys: ExtendedActorSystem) extends Extension {
implicit val reader = GrpcBigQueryStorageReader()(sys)
implicit val reader: GrpcBigQueryStorageReader = GrpcBigQueryStorageReader()(sys)
}

object GrpcBigQueryStorageReaderExt extends ExtensionId[GrpcBigQueryStorageReaderExt] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.concurrent.Future

class ExampleReader {

implicit val sys = ActorSystem("ExampleReader")
implicit val sys: ActorSystem = ActorSystem("ExampleReader")

//#read-all
val sourceOfSources: Source[(ReadSession.Schema, Seq[Source[ReadRowsResponse.Rows, NotUsed]]), Future[NotUsed]] =
Expand Down
Loading

0 comments on commit ca9af1b

Please sign in to comment.