diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala index 37a5f03562..6422419240 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala @@ -39,13 +39,13 @@ class AmqpGraphStageLogicConnectionShutdownSpec with BeforeAndAfterEach with LogCapturing { - override implicit val patienceConfig = PatienceConfig(10.seconds) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds) private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic val shutdownsAdded = new AtomicInteger() val shutdownsRemoved = new AtomicInteger() - override def beforeEach() = { + override def beforeEach(): Unit = { shutdownsAdded.set(0) shutdownsRemoved.set(0) } @@ -65,7 +65,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec "registers and unregisters a single connection shutdown hook per graph" in { // actor system is within this test as it has to be shut down in order // to verify graph stage termination - implicit val system = ActorSystem(this.getClass.getSimpleName + System.currentTimeMillis()) + implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName + System.currentTimeMillis()) val connectionFactory = new ConnectionFactory() { override def newConnection(es: ExecutorService, ar: AddressResolver, name: String): Connection = diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala index 1fb10273d3..89340cc108 100644 --- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala +++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala @@ -23,9 +23,9 @@ import scala.collection.immutable */ class AmqpDocsSpec extends AmqpSpec { - override implicit val patienceConfig = PatienceConfig(10.seconds) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds) - val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_) + val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful "The AMQP Connectors" should { diff --git a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala index 76e3ba38d0..919add4e90 100644 --- a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala +++ b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala @@ -4,7 +4,6 @@ package docs.scaladsl -import java.util.concurrent.CompletableFuture import akka.actor.ActorSystem import akka.stream.alpakka.awslambda.scaladsl.AwsLambdaFlow import akka.stream.alpakka.testkit.scaladsl.LogCapturing @@ -16,16 +15,17 @@ import org.mockito.ArgumentMatchers.{any => mockitoAny, eq => mockitoEq} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.lambda.LambdaAsyncClient import software.amazon.awssdk.services.lambda.model.{InvokeRequest, InvokeResponse} -import scala.concurrent.{Await, ExecutionContext} +import java.util.concurrent.CompletableFuture import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext} class AwsLambdaFlowSpec extends TestKit(ActorSystem("AwsLambdaFlowSpec")) diff --git a/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/impl/AzureQueueSourceStage.scala b/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/impl/AzureQueueSourceStage.scala index e75ed32489..b50810a3b3 100644 --- a/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/impl/AzureQueueSourceStage.scala +++ b/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/impl/AzureQueueSourceStage.scala @@ -58,7 +58,7 @@ import scala.collection.mutable.Queue setHandler( out, new OutHandler { - override def onPull: Unit = + override def onPull(): Unit = if (!buffer.isEmpty) { push(out, buffer.dequeue()) } else { diff --git a/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraLifecycle.scala b/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraLifecycle.scala index 712c3e7253..938c717e71 100644 --- a/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraLifecycle.scala +++ b/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraLifecycle.scala @@ -6,7 +6,6 @@ package akka.stream.alpakka.cassandra.scaladsl import java.util.concurrent.CompletionStage import java.util.concurrent.atomic.AtomicInteger - import akka.Done import akka.testkit.TestKitBase import com.datastax.oss.driver.api.core.cql._ @@ -16,7 +15,7 @@ import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures} import scala.collection.JavaConverters._ import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.control.NonFatal import scala.compat.java8.FutureConverters._ @@ -63,7 +62,7 @@ trait CassandraLifecycleBase { executeCql(lifecycleSession, statements.asScala.toList).toJava def withSchemaMetadataDisabled(block: => Future[Done]): Future[Done] = { - implicit val ec = lifecycleSession.ec + implicit val ec: ExecutionContext = lifecycleSession.ec lifecycleSession.underlying().flatMap { cqlSession => cqlSession.setSchemaMetadataEnabled(false) val blockResult = diff --git a/cassandra/src/test/scala/docs/scaladsl/AkkaDiscoverySpec.scala b/cassandra/src/test/scala/docs/scaladsl/AkkaDiscoverySpec.scala index cc2cac3f83..ac58337896 100644 --- a/cassandra/src/test/scala/docs/scaladsl/AkkaDiscoverySpec.scala +++ b/cassandra/src/test/scala/docs/scaladsl/AkkaDiscoverySpec.scala @@ -60,7 +60,7 @@ class AkkaDiscoverySpec extends CassandraSpecBase(ActorSystem("AkkaDiscoverySpec "show referencing config in docs" in { // #discovery val sessionSettings = CassandraSessionSettings("example-with-akka-discovery") - implicit val session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings) + implicit val session: CassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings) // #discovery session.close(system.dispatcher) } diff --git a/doc-examples/src/test/scala/akka/stream/alpakka/eip/scaladsl/PassThroughExamples.scala b/doc-examples/src/test/scala/akka/stream/alpakka/eip/scaladsl/PassThroughExamples.scala index 78dcd3936c..73cc9b8c02 100644 --- a/doc-examples/src/test/scala/akka/stream/alpakka/eip/scaladsl/PassThroughExamples.scala +++ b/doc-examples/src/test/scala/akka/stream/alpakka/eip/scaladsl/PassThroughExamples.scala @@ -19,7 +19,7 @@ import org.scalatest.wordspec.AnyWordSpec class PassThroughExamples extends AnyWordSpec with BeforeAndAfterAll with Matchers with ScalaFutures { - implicit val system = ActorSystem("Test") + implicit val system: ActorSystem = ActorSystem("Test") "PassThroughFlow" should { " original message is maintained " in { @@ -93,7 +93,7 @@ object PassThroughFlow { //#PassThrough object PassThroughFlowKafkaCommitExample { - implicit val system = ActorSystem("Test") + implicit val system: ActorSystem = ActorSystem("Test") def dummy(): Unit = { // #passThroughKafkaFlow diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala index a2163398d1..7f594384ff 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala @@ -8,12 +8,12 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.annotation.{ApiMayChange, InternalApi} import akka.http.scaladsl.{Http, HttpExt} -import akka.stream.alpakka.elasticsearch.{impl, _} +import akka.stream.alpakka.elasticsearch._ import akka.stream.scaladsl.{Flow, FlowWithContext, RetryFlow} import spray.json._ import scala.collection.immutable -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.ExecutionContext /** * Scala API to create Elasticsearch flows. @@ -210,7 +210,7 @@ object ElasticsearchFlow { .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() - implicit val ec: ExecutionContextExecutor = mat.executionContext + implicit val ec: ExecutionContext = mat.executionContext Flow.fromGraph { new impl.ElasticsearchSimpleFlowStage[T, C](elasticsearchParams, settings, writer) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala index e8893ea016..10d7162513 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala @@ -7,11 +7,11 @@ package akka.stream.alpakka.elasticsearch.scaladsl import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.{Http, HttpExt} -import akka.stream.alpakka.elasticsearch.{impl, _} +import akka.stream.alpakka.elasticsearch._ import akka.stream.scaladsl.Source import spray.json._ -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.ExecutionContext /** * Scala API to create Elasticsearch sources. @@ -67,7 +67,7 @@ object ElasticsearchSource { .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() - implicit val ec: ExecutionContextExecutor = mat.executionContext + implicit val ec: ExecutionContext = mat.executionContext val sourceStage = new impl.ElasticsearchSourceStage( elasticsearchParams, @@ -106,7 +106,7 @@ object ElasticsearchSource { .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() - implicit val ec: ExecutionContextExecutor = mat.executionContext + implicit val ec: ExecutionContext = mat.executionContext Source.fromGraph( new impl.ElasticsearchSourceStage(elasticsearchParams, diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala index b55a70a567..1c4f438a04 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala @@ -24,7 +24,7 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSett val ftpClient: () => FtpClient ) extends GraphStageLogic(shape) { - protected[this] implicit val client = ftpClient() + protected[this] implicit val client: FtpClient = ftpClient() protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler] protected[this] var failed = false diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala index 6f5e7d911b..2e46c8a984 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala @@ -67,8 +67,11 @@ protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] => @InternalApi object FtpLike { // type class instances - implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations - implicit val ftpsLikeInstance = new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations - implicit val sFtpLikeInstance = + implicit val ftpLikeInstance: FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations = + new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations + implicit val ftpsLikeInstance: FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations = + new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations + implicit val sFtpLikeInstance + : FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads = new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads } diff --git a/ftp/src/test/java/akka/stream/alpakka/ftp/FtpsWithTrustAndKeyManagersStageTest.java b/ftp/src/test/java/akka/stream/alpakka/ftp/FtpsWithTrustAndKeyManagersStageTest.java index 208d567354..4864049fb7 100644 --- a/ftp/src/test/java/akka/stream/alpakka/ftp/FtpsWithTrustAndKeyManagersStageTest.java +++ b/ftp/src/test/java/akka/stream/alpakka/ftp/FtpsWithTrustAndKeyManagersStageTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016-2020 Lightbend Inc. + * Copyright (C) since 2016 Lightbend Inc. */ package akka.stream.alpakka.ftp; diff --git a/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala b/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala index 0f38fd5f60..30a11e0d55 100644 --- a/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala +++ b/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala @@ -4,24 +4,25 @@ package akka.stream.alpakka.ftp -import java.io.IOException -import java.net.InetAddress -import java.nio.file.attribute.PosixFilePermission -import java.nio.file.{Files, Paths} -import java.time.Instant -import java.util.concurrent.TimeUnit -import akka.stream.{IOOperationIncompleteException, IOResult} -import BaseSftpSupport.{CLIENT_PRIVATE_KEY_PASSPHRASE => ClientPrivateKeyPassphrase} +import akka.actor.ActorSystem +import akka.stream.alpakka.ftp.BaseSftpSupport.{CLIENT_PRIVATE_KEY_PASSPHRASE => ClientPrivateKeyPassphrase} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{IOOperationIncompleteException, IOResult, Materializer} import akka.util.ByteString import org.scalatest.concurrent.Eventually import org.scalatest.time.{Millis, Seconds, Span} +import java.io.IOException +import java.net.InetAddress +import java.nio.file.attribute.PosixFilePermission +import java.nio.file.{Files, Paths} +import java.time.Instant +import java.util.concurrent.TimeUnit import scala.collection.immutable -import scala.concurrent.{Await, ExecutionContextExecutor} import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext} import scala.util.Random final class FtpStageSpec extends BaseFtpSpec with CommonFtpStageSpec @@ -84,14 +85,14 @@ final class UnconfirmedReadsSftpSourceSpec extends BaseSftpSpec with CommonFtpSt trait CommonFtpStageSpec extends BaseSpec with Eventually { - implicit val system = getSystem - implicit val mat = getMaterializer - implicit val defaultPatience = + implicit val system: ActorSystem = getSystem + implicit val mat: Materializer = getMaterializer + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = Span(30, Seconds), interval = Span(600, Millis)) "FtpBrowserSource" should { "complete with a failed Future, when the credentials supplied were wrong" in assertAllStagesStopped { - implicit val ec = system.getDispatcher + implicit val ec: ExecutionContext = system.getDispatcher listFilesWithWrongCredentials("") .toMat(Sink.seq)(Keep.right) .run() @@ -494,7 +495,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually { val source = mkdir(basePath, name) val innerSource = mkdir(innerDirPath, innerDirName) - implicit val ec: ExecutionContextExecutor = mat.executionContext + implicit val ec: ExecutionContext = mat.executionContext val res = for { x <- source.runWith(Sink.head) diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/impl/pdx/PdxDecoder.scala b/geode/src/main/scala/akka/stream/alpakka/geode/impl/pdx/PdxDecoder.scala index 5f74b28599..468e3b6f06 100644 --- a/geode/src/main/scala/akka/stream/alpakka/geode/impl/pdx/PdxDecoder.scala +++ b/geode/src/main/scala/akka/stream/alpakka/geode/impl/pdx/PdxDecoder.scala @@ -25,7 +25,7 @@ object PdxDecoder { private def instance[A](f: (PdxReader, Symbol) => Try[A]): PdxDecoder[A] = new PdxDecoder[A] { - def decode(reader: PdxReader, fieldName: Symbol) = f(reader, fieldName) + def decode(reader: PdxReader, fieldName: Symbol): Try[A] = f(reader, fieldName) } implicit val hnilDecoder: PdxDecoder[HNil] = instance((_, _) => Success(HNil)) @@ -145,14 +145,13 @@ object PdxDecoder { hDecoder: Lazy[PdxDecoder[H]], tDecoder: Lazy[PdxDecoder[T]] ): PdxDecoder[FieldType[K, H] :: T] = instance { - case (reader, fieldName) => { + case (reader, fieldName) => val headField = hDecoder.value.decode(reader, witness.value) val tailFields = tDecoder.value.decode(reader, fieldName) (headField, tailFields) match { case (Success(h), Success(t)) => Success(field[K](h) :: t) case _ => Failure(null) } - } case e => Failure(null) } diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/impl/pdx/PdxWriterMock.scala b/geode/src/test/scala/akka/stream/alpakka/geode/impl/pdx/PdxWriterMock.scala index 4e35587015..8aacfda7f8 100644 --- a/geode/src/test/scala/akka/stream/alpakka/geode/impl/pdx/PdxWriterMock.scala +++ b/geode/src/test/scala/akka/stream/alpakka/geode/impl/pdx/PdxWriterMock.scala @@ -79,5 +79,5 @@ object PdxMocks { override def writeByte(fieldName: String, value: Byte) = { println(s"Write $value"); this } } - implicit val writerMock = new WriterMock() + implicit val writerMock: WriterMock = new WriterMock() } diff --git a/geode/src/test/scala/docs/scaladsl/GeodeBaseSpec.scala b/geode/src/test/scala/docs/scaladsl/GeodeBaseSpec.scala index 1f63a40030..9c9b8a66da 100644 --- a/geode/src/test/scala/docs/scaladsl/GeodeBaseSpec.scala +++ b/geode/src/test/scala/docs/scaladsl/GeodeBaseSpec.scala @@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec class GeodeBaseSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with LogCapturing { - implicit val system = ActorSystem("test") + implicit val system: ActorSystem = ActorSystem("test") //#region val personsRegionSettings: RegionSettings[Int, Person] = RegionSettings("persons", (p: Person) => p.id) diff --git a/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/impl/AkkaGrpcSettings.scala b/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/impl/AkkaGrpcSettings.scala index 39fcfe4a0f..29a9a1c299 100644 --- a/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/impl/AkkaGrpcSettings.scala +++ b/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/impl/AkkaGrpcSettings.scala @@ -54,7 +54,7 @@ import java.util.concurrent.Executor .fold(settings.withTls(false))(_ => settings.withTls(true)) val setCallCredentials = (settings: GrpcClientSettings) => { - implicit val config = system.classicSystem.settings.config + implicit val config: Config = system.classicSystem.settings.config val executor: Executor = system.classicSystem.dispatcher settings.withCallCredentials(MoreCallCredentials.from(credentials().asGoogle(executor, requestSettings()))) } diff --git a/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/BigQueryStorage.scala b/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/BigQueryStorage.scala index b53860d836..be1f223c0e 100644 --- a/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/BigQueryStorage.scala +++ b/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/BigQueryStorage.scala @@ -14,11 +14,9 @@ import akka.util.ByteString import com.google.cloud.bigquery.storage.v1.DataFormat import com.google.cloud.bigquery.storage.v1.storage.{BigQueryReadClient, CreateReadSessionRequest, ReadRowsResponse} import com.google.cloud.bigquery.storage.v1.stream.ReadSession.TableReadOptions -import com.google.cloud.bigquery.storage.v1.stream.ReadSession +import com.google.cloud.bigquery.storage.v1.stream.{ReadSession, DataFormat => StreamDataFormat} -import com.google.cloud.bigquery.storage.v1.stream.{DataFormat => StreamDataFormat} - -import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.concurrent.{ExecutionContext, Future} /** * Google BigQuery Storage Api Akka Stream operator factory. @@ -54,7 +52,7 @@ object BigQueryStorage { Source.fromMaterializer { (mat, attr) => { implicit val materializer: Materializer = mat - implicit val executionContext: ExecutionContextExecutor = materializer.executionContext + implicit val executionContext: ExecutionContext = materializer.executionContext val client = reader(mat.system, attr).client readSession(client, projectId, datasetId, tableId, dataFormat, readOptions, maxNumStreams) .map { session => diff --git a/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/GrpcBigQueryStorageReader.scala b/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/GrpcBigQueryStorageReader.scala index 47e68cf888..518cad8803 100644 --- a/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/GrpcBigQueryStorageReader.scala +++ b/google-cloud-bigquery-storage/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/storage/scaladsl/GrpcBigQueryStorageReader.scala @@ -34,7 +34,7 @@ object GrpcBigQueryStorageReader { * An extension that manages a single gRPC scala reader client per actor system. */ final class GrpcBigQueryStorageReaderExt private (sys: ExtendedActorSystem) extends Extension { - implicit val reader = GrpcBigQueryStorageReader()(sys) + implicit val reader: GrpcBigQueryStorageReader = GrpcBigQueryStorageReader()(sys) } object GrpcBigQueryStorageReaderExt extends ExtensionId[GrpcBigQueryStorageReaderExt] with ExtensionIdProvider { diff --git a/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala b/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala index 270796a947..7a702ee069 100644 --- a/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala +++ b/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala @@ -29,7 +29,7 @@ import scala.concurrent.Future class ExampleReader { - implicit val sys = ActorSystem("ExampleReader") + implicit val sys: ActorSystem = ActorSystem("ExampleReader") //#read-all val sourceOfSources: Source[(ReadSession.Schema, Seq[Source[ReadRowsResponse.Rows, NotUsed]]), Future[NotUsed]] = diff --git a/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala b/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala index d28f22ed48..04caceb4d1 100644 --- a/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala +++ b/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala @@ -6,6 +6,7 @@ package akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray import spray.json._ +import scala.collection.immutable import scala.reflect.ClassTag trait BigQueryCollectionFormats { @@ -14,7 +15,7 @@ trait BigQueryCollectionFormats { * Supplies the BigQueryJsonFormat for Lists. */ implicit def listFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[List[T]] = new BigQueryJsonFormat[List[T]] { - def write(list: List[T]) = JsArray(list.map(_.toJson).toVector) + def write(list: List[T]): JsArray = JsArray(list.map(_.toJson).toVector) def read(value: JsValue): List[T] = value match { case JsArray(elements) => elements.iterator.map(_.asJsObject.fields("v").convertTo[T]).toList case x => deserializationError("Expected List as JsArray, but got " + x) @@ -26,8 +27,8 @@ trait BigQueryCollectionFormats { */ implicit def arrayFormat[T: BigQueryJsonFormat: ClassTag]: BigQueryJsonFormat[Array[T]] = new BigQueryJsonFormat[Array[T]] { - def write(array: Array[T]) = JsArray(array.map(_.toJson).toVector) - def read(value: JsValue) = value match { + def write(array: Array[T]): JsArray = JsArray(array.map(_.toJson).toVector) + def read(value: JsValue): Array[T] = value match { case JsArray(elements) => elements.map(_.asJsObject.fields("v").convertTo[T]).toArray[T] case x => deserializationError("Expected Array as JsArray, but got " + x) } @@ -35,18 +36,25 @@ trait BigQueryCollectionFormats { import collection.{immutable => imm} - implicit def immIterableFormat[T: BigQueryJsonFormat] = viaSeq[imm.Iterable[T], T](seq => imm.Iterable(seq: _*)) - implicit def immSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.Seq[T], T](seq => imm.Seq(seq: _*)) - implicit def immIndexedSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.IndexedSeq[T], T](seq => imm.IndexedSeq(seq: _*)) - implicit def immLinearSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.LinearSeq[T], T](seq => imm.LinearSeq(seq: _*)) - implicit def vectorFormat[T: BigQueryJsonFormat] = viaSeq[Vector[T], T](seq => Vector(seq: _*)) + implicit def immIterableFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[immutable.Iterable[T]] = + viaSeq[imm.Iterable[T], T](seq => imm.Iterable(seq: _*)) + implicit def immSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.Seq[T]] = + viaSeq[imm.Seq[T], T](seq => imm.Seq(seq: _*)) + implicit def immIndexedSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.IndexedSeq[T]] = + viaSeq[imm.IndexedSeq[T], T](seq => imm.IndexedSeq(seq: _*)) + implicit def immLinearSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.LinearSeq[T]] = + viaSeq[imm.LinearSeq[T], T](seq => imm.LinearSeq(seq: _*)) + implicit def vectorFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.Vector[T]] = + viaSeq[imm.Vector[T], T](seq => imm.Vector(seq: _*)) - import collection._ - - implicit def iterableFormat[T: BigQueryJsonFormat] = viaSeq[Iterable[T], T](seq => Iterable(seq: _*)) - implicit def seqFormat[T: BigQueryJsonFormat] = viaSeq[Seq[T], T](seq => Seq(seq: _*)) - implicit def indexedSeqFormat[T: BigQueryJsonFormat] = viaSeq[IndexedSeq[T], T](seq => IndexedSeq(seq: _*)) - implicit def linearSeqFormat[T: BigQueryJsonFormat] = viaSeq[LinearSeq[T], T](seq => LinearSeq(seq: _*)) + implicit def iterableFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[collection.Iterable[T]] = + viaSeq[collection.Iterable[T], T](seq => collection.Iterable(seq: _*)) + implicit def seqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[collection.Seq[T]] = + viaSeq[collection.Seq[T], T](seq => collection.Seq(seq: _*)) + implicit def indexedSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[collection.IndexedSeq[T]] = + viaSeq[collection.IndexedSeq[T], T](seq => collection.IndexedSeq(seq: _*)) + implicit def linearSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[collection.LinearSeq[T]] = + viaSeq[collection.LinearSeq[T], T](seq => collection.LinearSeq(seq: _*)) /** * A BigQueryJsonFormat construction helper that creates a BigQueryJsonFormat for an Iterable type I from a builder function @@ -54,8 +62,8 @@ trait BigQueryCollectionFormats { */ def viaSeq[I <: Iterable[T], T: BigQueryJsonFormat](f: imm.Seq[T] => I): BigQueryJsonFormat[I] = new BigQueryJsonFormat[I] { - def write(iterable: I) = JsArray(iterable.map(_.toJson).toVector) - def read(value: JsValue) = value match { + def write(iterable: I): JsArray = JsArray(iterable.map(_.toJson).toVector) + def read(value: JsValue): I = value match { case JsArray(elements) => f(elements.map(_.asJsObject.fields("v").convertTo[T])) case x => deserializationError("Expected Collection as JsArray, but got " + x) } diff --git a/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala b/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala index 64d8f0b0de..c716fe3185 100644 --- a/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala +++ b/google-cloud-bigquery/src/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala @@ -4,6 +4,7 @@ package akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray +import spray.json import spray.json.{deserializationError, DefaultJsonProtocol, JsNumber, JsValue, JsonFormat} import scala.concurrent.duration.{DurationLong, FiniteDuration} @@ -13,18 +14,29 @@ import scala.concurrent.duration.{DurationLong, FiniteDuration} */ trait BigQueryRestBasicFormats { - implicit val IntJsonFormat = DefaultJsonProtocol.IntJsonFormat - implicit val FloatJsonFormat = DefaultJsonProtocol.FloatJsonFormat - implicit val DoubleJsonFormat = DefaultJsonProtocol.DoubleJsonFormat - implicit val ByteJsonFormat = DefaultJsonProtocol.ByteJsonFormat - implicit val ShortJsonFormat = DefaultJsonProtocol.ShortJsonFormat - implicit val BigDecimalJsonFormat = DefaultJsonProtocol.BigDecimalJsonFormat - implicit val BigIntJsonFormat = DefaultJsonProtocol.BigIntJsonFormat - implicit val UnitJsonFormat = DefaultJsonProtocol.UnitJsonFormat - implicit val BooleanJsonFormat = DefaultJsonProtocol.BooleanJsonFormat - implicit val CharJsonFormat = DefaultJsonProtocol.CharJsonFormat - implicit val StringJsonFormat = DefaultJsonProtocol.StringJsonFormat - implicit val SymbolJsonFormat = DefaultJsonProtocol.SymbolJsonFormat + implicit val IntJsonFormat: json.DefaultJsonProtocol.IntJsonFormat.type = DefaultJsonProtocol.IntJsonFormat + implicit val FloatJsonFormat: json.DefaultJsonProtocol.FloatJsonFormat.type = + DefaultJsonProtocol.FloatJsonFormat + implicit val DoubleJsonFormat: json.DefaultJsonProtocol.DoubleJsonFormat.type = + DefaultJsonProtocol.DoubleJsonFormat + implicit val ByteJsonFormat: json.DefaultJsonProtocol.ByteJsonFormat.type = + DefaultJsonProtocol.ByteJsonFormat + implicit val ShortJsonFormat: json.DefaultJsonProtocol.ShortJsonFormat.type = + DefaultJsonProtocol.ShortJsonFormat + implicit val BigDecimalJsonFormat: json.DefaultJsonProtocol.BigDecimalJsonFormat.type = + DefaultJsonProtocol.BigDecimalJsonFormat + implicit val BigIntJsonFormat: json.DefaultJsonProtocol.BigIntJsonFormat.type = + DefaultJsonProtocol.BigIntJsonFormat + implicit val UnitJsonFormat: json.DefaultJsonProtocol.UnitJsonFormat.type = + DefaultJsonProtocol.UnitJsonFormat + implicit val BooleanJsonFormat: json.DefaultJsonProtocol.BooleanJsonFormat.type = + DefaultJsonProtocol.BooleanJsonFormat + implicit val CharJsonFormat: json.DefaultJsonProtocol.CharJsonFormat.type = + DefaultJsonProtocol.CharJsonFormat + implicit val StringJsonFormat: json.DefaultJsonProtocol.StringJsonFormat.type = + DefaultJsonProtocol.StringJsonFormat + implicit val SymbolJsonFormat: json.DefaultJsonProtocol.SymbolJsonFormat.type = + DefaultJsonProtocol.SymbolJsonFormat implicit object BigQueryLongJsonFormat extends JsonFormat[Long] { def write(x: Long) = JsNumber(x) diff --git a/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala b/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala index 26d12f22a9..4e14117e35 100644 --- a/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala +++ b/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala @@ -4,12 +4,14 @@ package akka.stream.alpakka.googlecloud.bigquery.e2e.scaladsl -import akka.actor.ActorSystem +import akka.actor.{ActorSystem, Scheduler} import akka.{pattern, Done} import akka.stream.alpakka.googlecloud.bigquery.HoverflySupport import akka.stream.alpakka.googlecloud.bigquery.e2e.{A, B, C} import akka.stream.alpakka.googlecloud.bigquery.model.JobState import akka.stream.alpakka.googlecloud.bigquery.model.TableReference +import akka.stream.alpakka.googlecloud.bigquery.scaladsl.schema.TableSchemaWriter +import akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray.BigQueryRootJsonFormat import akka.testkit.TestKit import io.specto.hoverfly.junit.core.{HoverflyMode, SimulationSource} import org.scalatest.BeforeAndAfterAll @@ -39,14 +41,14 @@ class BigQueryEndToEndSpec } } - override def afterAll() = { + override def afterAll(): Unit = { system.terminate() if (hoverfly.getMode == HoverflyMode.CAPTURE) hoverfly.exportSimulation(new File("hoverfly/BigQueryEndToEndSpec.json").toPath) super.afterAll() } - implicit def scheduler = system.scheduler + implicit def scheduler: Scheduler = system.scheduler "BigQuery Scala DSL" should { @@ -56,12 +58,12 @@ class BigQueryEndToEndSpec import akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray.BigQueryJsonProtocol._ import akka.stream.scaladsl.{Sink, Source} - implicit val cFormat = bigQueryJsonFormat5(C) - implicit val bFormat = bigQueryJsonFormat3(B) - implicit val aFormat = bigQueryJsonFormat7(A) - implicit val cSchema = bigQuerySchema5(C) - implicit val bSchema = bigQuerySchema3(B) - implicit val aSchema = bigQuerySchema7(A) + implicit val cFormat: BigQueryRootJsonFormat[C] = bigQueryJsonFormat5(C) + implicit val bFormat: BigQueryRootJsonFormat[B] = bigQueryJsonFormat3(B) + implicit val aFormat: BigQueryRootJsonFormat[A] = bigQueryJsonFormat7(A) + implicit val cSchema: TableSchemaWriter[C] = bigQuerySchema5(C) + implicit val bSchema: TableSchemaWriter[B] = bigQuerySchema3(B) + implicit val aSchema: TableSchemaWriter[A] = bigQuerySchema7(A) "create dataset" in { BigQuery.createDataset(datasetId).map { dataset => diff --git a/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala b/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala index 2196a7e1b2..0556496da2 100644 --- a/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala +++ b/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala @@ -40,38 +40,38 @@ class BigQueryQueriesSpec jsonFormat10(QueryResponse[T]) } - implicit val settings = GoogleSettings().copy(credentials = NoCredentials("", "")) + implicit val settings: GoogleSettings = GoogleSettings().copy(credentials = NoCredentials("", "")) val jobId = "jobId" val pageToken = "pageToken" - val incompleteQuery = QueryResponse[JsValue]( + val incompleteQuery: QueryResponse[JsValue] = QueryResponse[JsValue]( None, JobReference(Some(settings.projectId), Some(jobId), None), None, None, None, None, - false, + jobComplete = false, None, None, None ) - val completeQuery = incompleteQuery.copy[JsValue]( + val completeQuery: QueryResponse[JsValue] = incompleteQuery.copy[JsValue]( jobComplete = true, rows = Some(JsString("firstPage") :: Nil) ) - val completeQueryWith2ndPage = completeQuery.copy[JsValue]( + val completeQueryWith2ndPage: QueryResponse[JsValue] = completeQuery.copy[JsValue]( pageToken = Some(pageToken) ) - val query2ndPage = completeQuery.copy[JsValue]( + val query2ndPage: QueryResponse[JsValue] = completeQuery.copy[JsValue]( rows = Some(JsString("secondPage") :: Nil) ) - val completeQueryWithoutJobId = completeQuery.copy[JsValue]( + val completeQueryWithoutJobId: QueryResponse[JsValue] = completeQuery.copy[JsValue]( jobReference = JobReference(None, None, None) ) diff --git a/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala b/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala index 2484da2be7..e1c93fba83 100644 --- a/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala +++ b/google-cloud-bigquery/src/test/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala @@ -50,8 +50,8 @@ class BigQueryJsonProtocolSpec extends BigQueryJsonProtocol with AnyWordSpecLike case class Record(name: Option[String], addresses: Seq[Address]) case class Address(street: Option[String], city: Option[String]) - implicit val addressFormat = bigQueryJsonFormat2(Address) - implicit val recordFormat = bigQueryJsonFormat2(Record) + implicit val addressFormat: BigQueryRootJsonFormat[Address] = bigQueryJsonFormat2(Address) + implicit val recordFormat: BigQueryRootJsonFormat[Record] = bigQueryJsonFormat2(Record) "BigQueryJsonProtocol" should { diff --git a/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala b/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala index a507f770ae..0d045ac2f3 100644 --- a/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala +++ b/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala @@ -20,7 +20,9 @@ import akka.stream.alpakka.googlecloud.bigquery.model.{ } import akka.stream.alpakka.googlecloud.bigquery.scaladsl.BigQuery import akka.stream.alpakka.googlecloud.bigquery.scaladsl.schema.BigQuerySchemas._ +import akka.stream.alpakka.googlecloud.bigquery.scaladsl.schema.TableSchemaWriter import akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray.BigQueryJsonProtocol._ +import akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray.BigQueryRootJsonFormat import akka.stream.scaladsl.{Flow, Sink, Source} import akka.{Done, NotUsed} @@ -38,8 +40,8 @@ class BigQueryDoc { //#setup case class Person(name: String, age: Int, addresses: Seq[Address], isHakker: Boolean) case class Address(street: String, city: String, postalCode: Option[Int]) - implicit val addressFormat = bigQueryJsonFormat3(Address) - implicit val personFormat = bigQueryJsonFormat4(Person) + implicit val addressFormat: BigQueryRootJsonFormat[Address] = bigQueryJsonFormat3(Address) + implicit val personFormat: BigQueryRootJsonFormat[Person] = bigQueryJsonFormat4(Person) //#setup @nowarn("msg=dead code") @@ -103,8 +105,8 @@ class BigQueryDoc { //#table-methods //#create-table - implicit val addressSchema = bigQuerySchema3(Address) - implicit val personSchema = bigQuerySchema4(Person) + implicit val addressSchema: TableSchemaWriter[Address] = bigQuerySchema3(Address) + implicit val personSchema: TableSchemaWriter[Person] = bigQuerySchema4(Person) val newTable: Future[Table] = BigQuery.createTable[Person](datasetId, "newTableId") //#create-table diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala index 3c7184de35..22e1a5a30e 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala @@ -64,7 +64,7 @@ object GrpcPublisher { * An extension that manages a single gRPC java publisher client per actor system. */ final class GrpcPublisherExt private (sys: ExtendedActorSystem) extends Extension { - implicit val publisher = GrpcPublisher.create(sys) + implicit val publisher: GrpcPublisher = GrpcPublisher.create(sys) } object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala index 8a3751b9d4..e8044a7136 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala @@ -65,7 +65,7 @@ object GrpcSubscriber { * An extension that manages a single gRPC java subscriber client per actor system. */ final class GrpcSubscriberExt private (sys: ExtendedActorSystem) extends Extension { - implicit val subscriber = GrpcSubscriber.create(sys) + implicit val subscriber: GrpcSubscriber = GrpcSubscriber.create(sys) } object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala index 1c93581307..3591e169c5 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala @@ -55,7 +55,7 @@ object GrpcPublisher { * An extension that manages a single gRPC scala publisher client per actor system. */ final class GrpcPublisherExt private (sys: ExtendedActorSystem) extends Extension { - implicit val publisher = GrpcPublisher(sys: ActorSystem) + implicit val publisher: GrpcPublisher = GrpcPublisher(sys: ActorSystem) } object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala index 2eadbd9b90..c70909a059 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala @@ -55,7 +55,7 @@ object GrpcSubscriber { * An extension that manages a single gRPC scala subscriber client per actor system. */ final class GrpcSubscriberExt private (sys: ExtendedActorSystem) extends Extension { - implicit val subscriber = GrpcSubscriber(sys: ActorSystem) + implicit val subscriber: GrpcSubscriber = GrpcSubscriber(sys: ActorSystem) } object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/ExampleApp.scala b/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/ExampleApp.scala index a7333665ba..4aa99d93cb 100644 --- a/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/ExampleApp.scala +++ b/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/ExampleApp.scala @@ -27,7 +27,7 @@ object ExampleApp { |akka.loglevel = INFO """.stripMargin) - implicit val sys = ActorSystem("ExampleApp", config) + implicit val sys: ActorSystem = ActorSystem("ExampleApp", config) import sys.dispatcher diff --git a/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala b/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala index dbf1a31c97..341a43f423 100644 --- a/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala +++ b/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala @@ -39,9 +39,9 @@ class IntegrationSpec with OptionValues with LogCapturing { - implicit val system = ActorSystem("IntegrationSpec") + implicit val system: ActorSystem = ActorSystem("IntegrationSpec") - implicit val defaultPatience = PatienceConfig(timeout = 15.seconds, interval = 50.millis) + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 15.seconds, interval = 50.millis) "connector" should { diff --git a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala index 41ce1c0c40..b6e05be7a1 100644 --- a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala +++ b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala @@ -13,7 +13,7 @@ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model.HttpMethods.POST import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.{FromResponseUnmarshaller, Unmarshal, Unmarshaller} -import akka.stream.alpakka.google.GoogleAttributes +import akka.stream.alpakka.google.{GoogleAttributes, GoogleSettings, RequestSettings} import akka.stream.alpakka.google.http.GoogleHttp import akka.stream.alpakka.google.implicits._ import akka.stream.alpakka.googlecloud.pubsub._ @@ -40,7 +40,7 @@ private[pubsub] object PubSubApi extends PubSubApi { val PubSubGoogleApisHost: String = PubSubEmulatorHost.getOrElse(DefaultPubSubGoogleApisHost) val PubSubGoogleApisPort: Int = PubSubEmulatorPort.getOrElse(DefaultPubSubGoogleApisPort) - override def isEmulated = PubSubEmulatorHost.nonEmpty + override def isEmulated: Boolean = PubSubEmulatorHost.nonEmpty private[pubsub] lazy val PubSubEmulatorHost: Option[String] = sys.props .get(PubSubEmulatorHostVarName) @@ -59,7 +59,7 @@ private[pubsub] trait PubSubApi { def PubSubGoogleApisPort: Int def isEmulated: Boolean - private implicit val instantFormat = new RootJsonFormat[Instant] { + private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { override def read(jsValue: JsValue): Instant = jsValue match { case JsString(time) => Instant.parse(time) case _ => deserializationError("Instant required as a string of RFC3339 UTC Zulu format.") @@ -67,7 +67,7 @@ private[pubsub] trait PubSubApi { override def write(instant: Instant): JsValue = JsString(instant.toString) } - private implicit val pubSubMessageFormat = + private implicit val pubSubMessageFormat: RootJsonFormat[PubSubMessage] = new RootJsonFormat[PubSubMessage] { override def read(json: JsValue): PubSubMessage = { val fields = json.asJsObject.fields @@ -91,7 +91,7 @@ private[pubsub] trait PubSubApi { ) } - private implicit val publishMessageFormat = new RootJsonFormat[PublishMessage] { + private implicit val publishMessageFormat: RootJsonFormat[PublishMessage] = new RootJsonFormat[PublishMessage] { def read(json: JsValue): PublishMessage = { val data = json.asJsObject.fields("data").convertTo[String] val attributes = json.asJsObject.fields("attributes").convertTo[immutable.Map[String, String]] @@ -106,37 +106,38 @@ private[pubsub] trait PubSubApi { ) } - private implicit val pubSubRequestFormat = new RootJsonFormat[PublishRequest] { + private implicit val pubSubRequestFormat: RootJsonFormat[PublishRequest] = new RootJsonFormat[PublishRequest] { def read(json: JsValue): PublishRequest = PublishRequest(json.asJsObject.fields("messages").convertTo[immutable.Seq[PublishMessage]]) def write(pr: PublishRequest): JsValue = JsObject("messages" -> pr.messages.toJson) } - private implicit val gcePubSubResponseFormat = new RootJsonFormat[PublishResponse] { + private implicit val gcePubSubResponseFormat: RootJsonFormat[PublishResponse] = new RootJsonFormat[PublishResponse] { def read(json: JsValue): PublishResponse = PublishResponse(json.asJsObject.fields("messageIds").convertTo[immutable.Seq[String]]) def write(pr: PublishResponse): JsValue = JsObject("messageIds" -> pr.messageIds.toJson) } - private implicit val receivedMessageFormat = new RootJsonFormat[ReceivedMessage] { + private implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = new RootJsonFormat[ReceivedMessage] { def read(json: JsValue): ReceivedMessage = ReceivedMessage(json.asJsObject.fields("ackId").convertTo[String], json.asJsObject.fields("message").convertTo[PubSubMessage]) def write(rm: ReceivedMessage): JsValue = JsObject("ackId" -> rm.ackId.toJson, "message" -> rm.message.toJson) } - private implicit val pubSubPullResponseFormat = new RootJsonFormat[PullResponse] { + private implicit val pubSubPullResponseFormat: RootJsonFormat[PullResponse] = new RootJsonFormat[PullResponse] { def read(json: JsValue): PullResponse = PullResponse(json.asJsObject.fields.get("receivedMessages").map(_.convertTo[immutable.Seq[ReceivedMessage]])) def write(pr: PullResponse): JsValue = pr.receivedMessages.map(rm => JsObject("receivedMessages" -> rm.toJson)).getOrElse(JsObject.empty) } - private implicit val acknowledgeRequestFormat = new RootJsonFormat[AcknowledgeRequest] { - def read(json: JsValue): AcknowledgeRequest = - AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*) - def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson) - } - private implicit val pullRequestFormat = DefaultJsonProtocol.jsonFormat2(PullRequest) + private implicit val acknowledgeRequestFormat: RootJsonFormat[AcknowledgeRequest] = + new RootJsonFormat[AcknowledgeRequest] { + def read(json: JsValue): AcknowledgeRequest = + AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*) + def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson) + } + private implicit val pullRequestFormat: RootJsonFormat[PullRequest] = DefaultJsonProtocol.jsonFormat2(PullRequest) private def scheme: String = if (isEmulated) "http" else "https" @@ -144,8 +145,8 @@ private[pubsub] trait PubSubApi { Flow .fromMaterializer { (mat, attr) => import mat.executionContext - implicit val settings = GoogleAttributes.resolveSettings(mat, attr) - implicit val requestSettings = settings.requestSettings + implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) + implicit val requestSettings: RequestSettings = settings.requestSettings val url: Uri = Uri.from( scheme = scheme, @@ -183,8 +184,8 @@ private[pubsub] trait PubSubApi { Flow .fromMaterializer { (mat, attr) => import mat.executionContext - implicit val settings = GoogleAttributes.resolveSettings(mat, attr) - implicit val requestSettings = settings.requestSettings + implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) + implicit val requestSettings: RequestSettings = settings.requestSettings val url: Uri = Uri.from( scheme = scheme, @@ -238,8 +239,8 @@ private[pubsub] trait PubSubApi { Flow .fromMaterializer { (mat, attr) => import mat.executionContext - implicit val system = mat.system - implicit val settings = GoogleAttributes.resolveSettings(mat, attr) + implicit val system: ActorSystem = mat.system + implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) val url: Uri = s"/v1/projects/${settings.projectId}/topics/$topic:publish" FlowWithContext[PublishRequest, T] .mapAsync(parallelism) { request => diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala index 15508b46fa..eaeb6fa182 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala @@ -25,10 +25,10 @@ import scala.concurrent.duration._ class GooglePubSubSpec extends AnyFlatSpec with ScalaFutures with Matchers with LogCapturing with BeforeAndAfterAll { - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 100.millis) - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() override protected def afterAll(): Unit = { TestKit.shutdownActorSystem(system) diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala index f0871ea118..ef106e0d3e 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala @@ -43,7 +43,7 @@ class NoopTrustManager extends X509TrustManager { class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with ScalaFutures with Matchers with LogCapturing { - implicit val system = ActorSystem( + implicit val system: ActorSystem = ActorSystem( "PubSubApiSpec", ConfigFactory .parseString( @@ -52,7 +52,7 @@ class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with ScalaFutures .withFallback(ConfigFactory.load()) ) - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 100.millis) def createInsecureSslEngine(host: String, port: Int): SSLEngine = { diff --git a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala index ce28cb2849..c9f43f8dd1 100644 --- a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala +++ b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala @@ -21,7 +21,7 @@ import scala.concurrent.{Future, Promise} class ExampleUsage { //#init-system - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() val config = PubSubConfig() val topic = "topic1" val subscription = "subscription1" diff --git a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala index e9f18baddb..06a46ebe45 100644 --- a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala @@ -35,7 +35,7 @@ class IntegrationSpec with OptionValues with LogCapturing { - private implicit val system = ActorSystem("IntegrationSpec") + private implicit val system: ActorSystem = ActorSystem("IntegrationSpec") override def afterAll(): Unit = TestKit.shutdownActorSystem(system) diff --git a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala index 878d0417c3..a441d6b533 100644 --- a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala +++ b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala @@ -16,13 +16,15 @@ import scala.util.Try object Formats extends DefaultJsonProtocol { private final case class CustomerEncryption(encryptionAlgorithm: String, keySha256: String) - private implicit val customerEncryptionJsonFormat = jsonFormat2(CustomerEncryption) + private implicit val customerEncryptionJsonFormat: RootJsonFormat[CustomerEncryption] = jsonFormat2( + CustomerEncryption + ) private final case class Owner(entity: String, entityId: Option[String]) - private implicit val OwnerJsonFormat = jsonFormat2(Owner) + private implicit val OwnerJsonFormat: RootJsonFormat[Owner] = jsonFormat2(Owner) private final case class ProjectTeam(projectNumber: String, team: String) - private implicit val ProjectTeamJsonFormat = jsonFormat2(ProjectTeam) + private implicit val ProjectTeamJsonFormat: RootJsonFormat[ProjectTeam] = jsonFormat2(ProjectTeam) private final case class ObjectAccessControls(kind: String, id: String, @@ -37,7 +39,9 @@ object Formats extends DefaultJsonProtocol { domain: String, projectTeam: ProjectTeam, etag: String) - private implicit val ObjectAccessControlsJsonFormat = jsonFormat13(ObjectAccessControls) + private implicit val ObjectAccessControlsJsonFormat: RootJsonFormat[ObjectAccessControls] = jsonFormat13( + ObjectAccessControls + ) /** * Google API storage response object @@ -68,7 +72,9 @@ object Formats extends DefaultJsonProtocol { updated: String ) - private implicit val storageObjectReadOnlyJson = jsonFormat18(StorageObjectReadOnlyJson) + private implicit val storageObjectReadOnlyJson: RootJsonFormat[StorageObjectReadOnlyJson] = jsonFormat18( + StorageObjectReadOnlyJson + ) // private sub class of StorageObjectJson used to workaround 22 field jsonFormat issue private final case class StorageObjectWriteableJson( @@ -88,7 +94,9 @@ object Formats extends DefaultJsonProtocol { acl: Option[List[ObjectAccessControls]] ) - private implicit val storageObjectWritableJson = jsonFormat14(StorageObjectWriteableJson) + private implicit val storageObjectWritableJson: RootJsonFormat[StorageObjectWriteableJson] = jsonFormat14( + StorageObjectWriteableJson + ) private implicit object StorageObjectJsonFormat extends RootJsonFormat[StorageObjectJson] { override def read(value: JsValue): StorageObjectJson = { @@ -122,7 +130,7 @@ object Formats extends DefaultJsonProtocol { items: Option[List[StorageObjectJson]] ) - private implicit val bucketInfoJsonFormat = jsonFormat6(BucketInfoJson) + private implicit val bucketInfoJsonFormat: RootJsonFormat[BucketInfoJson] = jsonFormat6(BucketInfoJson) /** * Google API rewrite response object @@ -138,7 +146,7 @@ object Formats extends DefaultJsonProtocol { resource: Option[StorageObjectJson] ) - private implicit val rewriteResponseFormat = jsonFormat6(RewriteResponseJson) + private implicit val rewriteResponseFormat: RootJsonFormat[RewriteResponseJson] = jsonFormat6(RewriteResponseJson) /** * Google API bucket response object @@ -154,7 +162,7 @@ object Formats extends DefaultJsonProtocol { etag: String ) - implicit val bucketInfoFormat = jsonFormat2(BucketInfo) + implicit val bucketInfoFormat: RootJsonFormat[BucketInfo] = jsonFormat2(BucketInfo) implicit object BucketListResultReads extends RootJsonReader[BucketListResult] { override def read(json: JsValue): BucketListResult = { @@ -168,7 +176,9 @@ object Formats extends DefaultJsonProtocol { } } - private implicit val bucketListResultJsonReads = jsonFormat4(BucketListResultJson) + private implicit val bucketListResultJsonReads: RootJsonFormat[BucketListResultJson] = jsonFormat4( + BucketListResultJson + ) implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] { override def read(json: JsValue): RewriteResponse = { diff --git a/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/WithMaterializerGlobal.scala b/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/WithMaterializerGlobal.scala index b1548a5093..62ca55bc09 100644 --- a/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/WithMaterializerGlobal.scala +++ b/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/WithMaterializerGlobal.scala @@ -10,8 +10,8 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext} trait WithMaterializerGlobal extends AnyWordSpecLike @@ -20,8 +20,8 @@ trait WithMaterializerGlobal with ScalaFutures with IntegrationPatience with Matchers { - implicit val actorSystem = ActorSystem("test") - implicit val ec = actorSystem.dispatcher + implicit val actorSystem: ActorSystem = ActorSystem("test") + implicit val ec: ExecutionContext = actorSystem.dispatcher override protected def afterAll(): Unit = { super.afterAll() diff --git a/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala b/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala index 2ec122c529..7ec5b1c513 100644 --- a/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala +++ b/google-cloud-storage/src/test/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala @@ -43,7 +43,7 @@ class GCStorageStreamIntegrationSpec with ScalaFutures with LogCapturing { - private implicit val defaultPatience = + private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 60.seconds, interval = 60.millis) var folderName: String = _ @@ -55,9 +55,9 @@ class GCStorageStreamIntegrationSpec def bucket = "alpakka" def rewriteBucket = "alpakka-rewrite" - def projectId = settings.projectId - def clientEmail = settings.clientEmail - def privateKey = settings.privateKey + def projectId: String = settings.projectId + def clientEmail: String = settings.clientEmail + def privateKey: String = settings.privateKey before { folderName = classOf[GCStorageStreamIntegrationSpec].getSimpleName + UUID.randomUUID().toString + "/" diff --git a/google-common/src/main/scala/akka/stream/alpakka/google/PaginatedRequest.scala b/google-common/src/main/scala/akka/stream/alpakka/google/PaginatedRequest.scala index d59524b321..de917d7fb5 100644 --- a/google-common/src/main/scala/akka/stream/alpakka/google/PaginatedRequest.scala +++ b/google-common/src/main/scala/akka/stream/alpakka/google/PaginatedRequest.scala @@ -4,6 +4,7 @@ package akka.stream.alpakka.google +import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.http.scaladsl.model.HttpMethods.GET @@ -42,8 +43,8 @@ private[alpakka] object PaginatedRequest { Source .fromMaterializer { (mat, attr) => - implicit val system = mat.system - implicit val settings = GoogleAttributes.resolveSettings(mat, attr) + implicit val system: ActorSystem = mat.system + implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) val requestWithPageToken = addPageToken(request, query) Source.unfoldAsync[Either[Done, Option[String]], Out](Right(initialPageToken)) { diff --git a/google-common/src/main/scala/akka/stream/alpakka/google/ResumableUpload.scala b/google-common/src/main/scala/akka/stream/alpakka/google/ResumableUpload.scala index eaa5c5158b..3fe9bf2f8c 100644 --- a/google-common/src/main/scala/akka/stream/alpakka/google/ResumableUpload.scala +++ b/google-common/src/main/scala/akka/stream/alpakka/google/ResumableUpload.scala @@ -5,6 +5,7 @@ package akka.stream.alpakka.google import akka.NotUsed +import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.http.scaladsl.model.HttpMethods.{POST, PUT} import akka.http.scaladsl.model.StatusCodes.{Created, OK, PermanentRedirect} @@ -46,8 +47,8 @@ private[alpakka] object ResumableUpload { Sink .fromMaterializer { (mat, attr) => import mat.executionContext - implicit val materializer = mat - implicit val settings = GoogleAttributes.resolveSettings(mat, attr) + implicit val materializer: Materializer = mat + implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) val uploadChunkSize = settings.requestSettings.uploadChunkSize val in = Flow[ByteString] @@ -86,13 +87,14 @@ private[alpakka] object ResumableUpload { private def initiateSession(request: HttpRequest)(implicit mat: Materializer, settings: GoogleSettings): Future[Uri] = { - implicit val system = mat.system + implicit val system: ActorSystem = mat.system import implicits._ - implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => - response.discardEntityBytes().future.map { _ => - response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri) - } + implicit val um: FromResponseUnmarshaller[Uri] = Unmarshaller.withMaterializer { + implicit ec => implicit mat => response: HttpResponse => + response.discardEntityBytes().future.map { _ => + response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri) + } }.withDefaultRetry GoogleHttp().singleAuthenticatedRequest[Uri](request) @@ -103,7 +105,7 @@ private[alpakka] object ResumableUpload { private def uploadChunk[T: FromResponseUnmarshaller]( request: HttpRequest )(implicit mat: Materializer): Flow[Either[T, MaybeLast[Chunk]], Try[Option[T]], NotUsed] = { - implicit val system = mat.system + implicit val system: ActorSystem = mat.system val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => response.status match { @@ -139,25 +141,26 @@ private[alpakka] object ResumableUpload { request: HttpRequest, chunk: Future[MaybeLast[Chunk]] )(implicit mat: Materializer, settings: GoogleSettings): Future[Either[T, MaybeLast[Chunk]]] = { - implicit val system = mat.system + implicit val system: ActorSystem = mat.system import implicits._ - implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => - response.status match { - case OK | Created => Unmarshal(response).to[T].map(Left(_)) - case PermanentRedirect => - response.discardEntityBytes().future.map { _ => - Right( - response - .header[Range] - .flatMap(_.ranges.headOption) - .collect { - case Slice(_, last) => last + 1 - } getOrElse 0L - ) - } - case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage)) - } + implicit val um: FromResponseUnmarshaller[Either[T, Long]] = Unmarshaller.withMaterializer { + implicit ec => implicit mat => response: HttpResponse => + response.status match { + case OK | Created => Unmarshal(response).to[T].map(Left(_)) + case PermanentRedirect => + response.discardEntityBytes().future.map { _ => + Right( + response + .header[Range] + .flatMap(_.ranges.headOption) + .collect { + case Slice(_, last) => last + 1 + } getOrElse 0L + ) + } + case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage)) + } }.withDefaultRetry import mat.executionContext diff --git a/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleComputeMetadata.scala b/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleComputeMetadata.scala index 774eba0a42..806745de0e 100644 --- a/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleComputeMetadata.scala +++ b/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleComputeMetadata.scala @@ -4,6 +4,7 @@ package akka.stream.alpakka.google.auth +import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.http.scaladsl.Http import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport @@ -33,7 +34,7 @@ private[auth] object GoogleComputeMetadata { ): Future[AccessToken] = { import SprayJsonSupport._ import mat.executionContext - implicit val system = mat.system + implicit val system: ActorSystem = mat.system for { response <- Http().singleRequest(tokenRequest) token <- Unmarshal(response.entity).to[AccessToken] @@ -44,7 +45,7 @@ private[auth] object GoogleComputeMetadata { implicit mat: Materializer ): Future[String] = { import mat.executionContext - implicit val system = mat.system + implicit val system: ActorSystem = mat.system for { response <- Http().singleRequest(projectIdRequest) projectId <- Unmarshal(response.entity).to[String] diff --git a/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2.scala b/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2.scala index 3439f07789..61bebde794 100644 --- a/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2.scala +++ b/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2.scala @@ -4,6 +4,7 @@ package akka.stream.alpakka.google.auth +import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.model.HttpMethods.POST @@ -33,7 +34,7 @@ private[auth] object GoogleOAuth2 { import GoogleOAuth2Exception._ import SprayJsonSupport._ import implicits._ - implicit val system = mat.system + implicit val system: ActorSystem = mat.system try { val entity = FormData( diff --git a/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Credentials.scala b/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Credentials.scala index 0ed363b7c3..1909e77135 100644 --- a/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Credentials.scala +++ b/google-common/src/main/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Credentials.scala @@ -29,7 +29,7 @@ private[auth] final class GoogleOAuth2Credentials(credentials: OAuth2Credentials Await.result(requestMetadata, Duration.Inf) override def getRequestMetadata(uri: URI, executor: Executor, callback: RequestMetadataCallback): Unit = { - implicit val ec = ExecutionContext.fromExecutor(executor) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor) requestMetadata.onComplete { case Success(metadata) => callback.onSuccess(metadata) case Failure(ex) => callback.onFailure(ex) diff --git a/google-common/src/main/scala/akka/stream/alpakka/google/auth/UserAccessMetadata.scala b/google-common/src/main/scala/akka/stream/alpakka/google/auth/UserAccessMetadata.scala index d9d8205cae..f74d13b1a0 100644 --- a/google-common/src/main/scala/akka/stream/alpakka/google/auth/UserAccessMetadata.scala +++ b/google-common/src/main/scala/akka/stream/alpakka/google/auth/UserAccessMetadata.scala @@ -4,6 +4,7 @@ package akka.stream.alpakka.google.auth +import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.http.scaladsl.Http import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport @@ -37,7 +38,7 @@ private[auth] object UserAccessMetadata { ): Future[AccessToken] = { import SprayJsonSupport._ import mat.executionContext - implicit val system = mat.system + implicit val system: ActorSystem = mat.system for { response <- Http().singleRequest(tokenRequest(clientId, clientSecret, refreshToken)) token <- Unmarshal(response.entity).to[AccessToken] diff --git a/google-common/src/main/scala/akka/stream/alpakka/google/http/GoogleHttp.scala b/google-common/src/main/scala/akka/stream/alpakka/google/http/GoogleHttp.scala index d1cad107e5..1f803a8f2f 100644 --- a/google-common/src/main/scala/akka/stream/alpakka/google/http/GoogleHttp.scala +++ b/google-common/src/main/scala/akka/stream/alpakka/google/http/GoogleHttp.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.google.http -import akka.actor.ClassicActorSystemProvider +import akka.actor.{ActorSystem, ClassicActorSystemProvider, Scheduler} import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.http.scaladsl.Http.HostConnectionPool @@ -12,11 +12,11 @@ import akka.http.scaladsl.model.headers.Authorization import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.unmarshalling.{FromResponseUnmarshaller, Unmarshal} import akka.http.scaladsl.{Http, HttpExt} -import akka.stream.alpakka.google.{GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings} import akka.stream.alpakka.google.util.Retry +import akka.stream.alpakka.google.{GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings} import akka.stream.scaladsl.{Flow, FlowWithContext, Keep, RetryFlow} -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @InternalApi @@ -35,9 +35,9 @@ private[alpakka] object GoogleHttp { @InternalApi private[alpakka] final class GoogleHttp private (val http: HttpExt) extends AnyVal { - private implicit def system = http.system - private implicit def ec = system.dispatcher - private implicit def scheduler = system.scheduler + private implicit def system: ActorSystem = http.system + private implicit def ec: ExecutionContext = system.dispatcher + private implicit def scheduler: Scheduler = system.scheduler /** * Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]]. @@ -68,7 +68,7 @@ private[alpakka] final class GoogleHttp private (val http: HttpExt) extends AnyV implicit settings: GoogleSettings, um: FromResponseUnmarshaller[T] ): Future[T] = Retry(settings.requestSettings.retrySettings) { - implicit val requestSettings = settings.requestSettings + implicit val requestSettings: RequestSettings = settings.requestSettings addAuth(request).flatMap(singleRequest(_))(ExecutionContexts.parasitic) } @@ -104,7 +104,7 @@ private[alpakka] final class GoogleHttp private (val http: HttpExt) extends AnyV parallelism: Int = 1 ): FlowWithContext[HttpRequest, Ctx, Try[T], Ctx, Future[HostConnectionPool]] = FlowWithContext.fromTuples { Flow.fromMaterializer { (mat, attr) => - implicit val settings = GoogleAttributes.resolveSettings(mat, attr) + implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) val p = if (port == -1) if (https) 443 else 80 else port val uriFlow = FlowWithContext[HttpRequest, Ctx].map(addStandardQuery) @@ -160,7 +160,7 @@ private[alpakka] final class GoogleHttp private (val http: HttpExt) extends AnyV ) private def addAuth(request: HttpRequest)(implicit settings: GoogleSettings): Future[HttpRequest] = { - implicit val requestSettings = settings.requestSettings + implicit val requestSettings: RequestSettings = settings.requestSettings settings.credentials .get() .map { token => diff --git a/google-common/src/test/scala/akka/stream/alpakka/google/PaginatedRequestSpec.scala b/google-common/src/test/scala/akka/stream/alpakka/google/PaginatedRequestSpec.scala index a9068c1840..94d8853960 100644 --- a/google-common/src/test/scala/akka/stream/alpakka/google/PaginatedRequestSpec.scala +++ b/google-common/src/test/scala/akka/stream/alpakka/google/PaginatedRequestSpec.scala @@ -33,7 +33,7 @@ class PaginatedRequestSpec super.afterAll() } - implicit val patience = PatienceConfig(remainingOrDefault) + implicit val patience: PatienceConfig = PatienceConfig(remainingOrDefault) implicit val paginated: Paginated[JsValue] = _.asJsObject.fields.get("pageToken").flatMap { case JsString(value) => Some(value) case _ => None diff --git a/google-common/src/test/scala/akka/stream/alpakka/google/ResumableUploadSpec.scala b/google-common/src/test/scala/akka/stream/alpakka/google/ResumableUploadSpec.scala index 201a541528..8c31b6deb6 100644 --- a/google-common/src/test/scala/akka/stream/alpakka/google/ResumableUploadSpec.scala +++ b/google-common/src/test/scala/akka/stream/alpakka/google/ResumableUploadSpec.scala @@ -8,7 +8,7 @@ import akka.actor.ActorSystem import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.HttpMethods.POST import akka.http.scaladsl.model.{ContentTypes, HttpRequest, Uri} -import akka.http.scaladsl.unmarshalling.Unmarshaller +import akka.http.scaladsl.unmarshalling.{FromResponseUnmarshaller, Unmarshaller} import akka.stream.alpakka.google.scaladsl.`X-Upload-Content-Type` import akka.stream.scaladsl.Source import akka.testkit.TestKit @@ -30,7 +30,7 @@ class ResumableUploadSpec with ScalaFutures with HoverflySupport { - implicit val patience = PatienceConfig(remainingOrDefault) + implicit val patience: PatienceConfig = PatienceConfig(remainingOrDefault) override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) @@ -71,7 +71,7 @@ class ResumableUploadSpec ) import implicits._ - implicit val um = + implicit val um: FromResponseUnmarshaller[JsValue] = Unmarshaller.messageUnmarshallerFromEntityUnmarshaller(sprayJsValueUnmarshaller).withDefaultRetry val result = Source diff --git a/google-common/src/test/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Spec.scala b/google-common/src/test/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Spec.scala index ae6ace31b5..63d90c7f63 100644 --- a/google-common/src/test/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Spec.scala +++ b/google-common/src/test/scala/akka/stream/alpakka/google/auth/GoogleOAuth2Spec.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.google.auth import akka.actor.ActorSystem -import akka.stream.alpakka.google.{GoogleSettings, HoverflySupport} +import akka.stream.alpakka.google.{GoogleSettings, HoverflySupport, RequestSettings} import akka.testkit.TestKit import io.specto.hoverfly.junit.core.SimulationSource.dsl import io.specto.hoverfly.junit.core.model.RequestFieldMatcher.newRegexMatcher @@ -32,11 +32,11 @@ class GoogleOAuth2Spec TestKit.shutdownActorSystem(system) super.afterAll() } - implicit val defaultPatience = PatienceConfig(remainingOrDefault) + implicit val defaultPatience: PatienceConfig = PatienceConfig(remainingOrDefault) implicit val executionContext: ExecutionContext = system.dispatcher - implicit val settings = GoogleSettings(system) - implicit val clock = Clock.systemUTC() + implicit val settings: GoogleSettings = GoogleSettings(system) + implicit val clock: Clock = Clock.systemUTC() lazy val privateKey = { val inputStream = getClass.getClassLoader.getResourceAsStream("private_pcks8.pem") @@ -65,7 +65,7 @@ class GoogleOAuth2Spec ) ) - implicit val settings = GoogleSettings().requestSettings + implicit val settings: RequestSettings = GoogleSettings().requestSettings GoogleOAuth2.getAccessToken("email", privateKey, scopes).futureValue should matchPattern { case AccessToken("token", exp) if exp > (System.currentTimeMillis / 1000L + 3000L) => } diff --git a/google-common/src/test/scala/akka/stream/alpakka/google/auth/OAuth2CredentialsSpec.scala b/google-common/src/test/scala/akka/stream/alpakka/google/auth/OAuth2CredentialsSpec.scala index 55cca17dd7..3910053d45 100644 --- a/google-common/src/test/scala/akka/stream/alpakka/google/auth/OAuth2CredentialsSpec.scala +++ b/google-common/src/test/scala/akka/stream/alpakka/google/auth/OAuth2CredentialsSpec.scala @@ -33,8 +33,8 @@ class OAuth2CredentialsSpec } import system.dispatcher - implicit val settings = GoogleSettings().requestSettings - implicit val clock = Clock.systemUTC() + implicit val settings: RequestSettings = GoogleSettings().requestSettings + implicit val clock: Clock = Clock.systemUTC() final object AccessTokenProvider { @volatile var accessTokenPromise: Promise[AccessToken] = Promise.failed(new RuntimeException) diff --git a/google-common/src/test/scala/akka/stream/alpakka/google/http/GoogleHttpSpec.scala b/google-common/src/test/scala/akka/stream/alpakka/google/http/GoogleHttpSpec.scala index 5b3e226cc8..0e3c8f7fab 100644 --- a/google-common/src/test/scala/akka/stream/alpakka/google/http/GoogleHttpSpec.scala +++ b/google-common/src/test/scala/akka/stream/alpakka/google/http/GoogleHttpSpec.scala @@ -66,7 +66,7 @@ class GoogleHttpSpec http } - implicit val settings = GoogleSettings().requestSettings + implicit val settings: RequestSettings = GoogleSettings().requestSettings "GoogleHttp" must { @@ -160,7 +160,7 @@ class GoogleHttpSpec Future.failed(GoogleOAuth2Exception(ErrorInfo())), Future.failed(new AnotherException) ) - implicit val settingsWithMockedCredentials = GoogleSettings().copy(credentials = credentials) + implicit val settingsWithMockedCredentials: GoogleSettings = GoogleSettings().copy(credentials = credentials) val http = mockHttp when( diff --git a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/v1/impl/FcmSenderSpec.scala b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/v1/impl/FcmSenderSpec.scala index a69e3dd7f0..0cc2fc750d 100644 --- a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/v1/impl/FcmSenderSpec.scala +++ b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/v1/impl/FcmSenderSpec.scala @@ -40,13 +40,13 @@ class FcmSenderSpec override def afterAll(): Unit = TestKit.shutdownActorSystem(system) - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 2.seconds, interval = 50.millis) implicit val executionContext: ExecutionContext = system.dispatcher - implicit val conf = FcmSettings() - implicit val settings = GoogleSettings().copy(projectId = "projectId") + implicit val conf: FcmSettings = FcmSettings() + implicit val settings: GoogleSettings = GoogleSettings().copy(projectId = "projectId") "FcmSender" should { diff --git a/google-fcm/src/test/scala/docs/scaladsl/FcmExamples.scala b/google-fcm/src/test/scala/docs/scaladsl/FcmExamples.scala index 2b264c071a..a27d3038f6 100644 --- a/google-fcm/src/test/scala/docs/scaladsl/FcmExamples.scala +++ b/google-fcm/src/test/scala/docs/scaladsl/FcmExamples.scala @@ -18,7 +18,7 @@ import scala.concurrent.Future class FcmExamples { - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() //#simple-send val fcmConfig = FcmSettings() diff --git a/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseFlowStage.scala b/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseFlowStage.scala index 17d821bba4..c7322b8b13 100644 --- a/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseFlowStage.scala +++ b/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseFlowStage.scala @@ -26,7 +26,7 @@ private[hbase] class HBaseFlowStage[A](settings: HTableSettings[A]) extends Grap override protected def logSource = classOf[HBaseFlowStage[A]] - implicit val connection = connect(settings.conf) + implicit val connection: Connection = connect(settings.conf) lazy val table: Table = getOrCreateTable(settings.tableName, settings.columnFamilies).get diff --git a/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseSourceStage.scala b/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseSourceStage.scala index 61795d4e39..5d0295e631 100644 --- a/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseSourceStage.scala +++ b/hbase/src/main/scala/akka/stream/alpakka/hbase/impl/HBaseSourceStage.scala @@ -7,7 +7,7 @@ package akka.stream.alpakka.hbase.impl import akka.stream.{Attributes, Outlet, SourceShape} import akka.stream.alpakka.hbase.HTableSettings import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging} -import org.apache.hadoop.hbase.client.{Result, Scan, Table} +import org.apache.hadoop.hbase.client.{Connection, Result, Scan, Table} import scala.util.control.NonFatal @@ -30,7 +30,7 @@ private[hbase] final class HBaseSourceLogic[A](scan: Scan, with StageLogging with HBaseCapabilities { - implicit val connection = connect(settings.conf) + implicit val connection: Connection = connect(settings.conf) lazy val table: Table = getOrCreateTable(settings.tableName, settings.columnFamilies).get private var results: java.util.Iterator[Result] = null diff --git a/hbase/src/test/scala/docs/scaladsl/HBaseStageSpec.scala b/hbase/src/test/scala/docs/scaladsl/HBaseStageSpec.scala index ee1b9ac502..ac981e0480 100644 --- a/hbase/src/test/scala/docs/scaladsl/HBaseStageSpec.scala +++ b/hbase/src/test/scala/docs/scaladsl/HBaseStageSpec.scala @@ -31,7 +31,7 @@ class HBaseStageSpec with BeforeAndAfterAll with LogCapturing { - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 500.millis) //#create-converter-put diff --git a/hdfs/src/test/scala/docs/scaladsl/HdfsReaderSpec.scala b/hdfs/src/test/scala/docs/scaladsl/HdfsReaderSpec.scala index 5735b37135..2f19478def 100644 --- a/hdfs/src/test/scala/docs/scaladsl/HdfsReaderSpec.scala +++ b/hdfs/src/test/scala/docs/scaladsl/HdfsReaderSpec.scala @@ -15,12 +15,12 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.DefaultCodec +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContextExecutor, Future} -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.{Await, ExecutionContext, Future} class HdfsReaderSpec extends AnyWordSpecLike @@ -40,7 +40,7 @@ class HdfsReaderSpec val fs: FileSystem = FileSystem.get(conf) val settings = HdfsWritingSettings() - implicit val ec: ExecutionContextExecutor = system.dispatcher + implicit val ec: ExecutionContext = system.dispatcher "HdfsSource" should { "read data file" in { diff --git a/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala b/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala index d2ed30fa1f..1ef2122eb8 100644 --- a/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala +++ b/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala @@ -18,12 +18,12 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress._ import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel import org.scalatest._ - -import scala.concurrent.duration.{Duration, _} -import scala.concurrent.{Await, ExecutionContextExecutor} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.duration.{Duration, _} + class HdfsWriterSpec extends AnyWordSpecLike with Matchers @@ -45,7 +45,7 @@ class HdfsWriterSpec val fs: FileSystem = FileSystem.get(conf) //#init-client - implicit val ec: ExecutionContextExecutor = system.dispatcher + implicit val ec: ExecutionContext = system.dispatcher fs.getConf.setEnum("zlib.compress.level", CompressionLevel.BEST_SPEED) diff --git a/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/HmsTokenApiSpec.scala b/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/HmsTokenApiSpec.scala index 6248e83b00..c390d110f4 100644 --- a/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/HmsTokenApiSpec.scala +++ b/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/HmsTokenApiSpec.scala @@ -36,7 +36,7 @@ class HmsTokenApiSpec override def afterAll() = TestKit.shutdownActorSystem(system) - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 2.seconds, interval = 50.millis) val config = HmsSettings() diff --git a/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/PushKitSenderSpec.scala b/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/PushKitSenderSpec.scala index 78ebb1a703..e05ab92d63 100644 --- a/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/PushKitSenderSpec.scala +++ b/huawei-push-kit/src/test/scala/akka/stream/alpakka/huawei/pushkit/impl/PushKitSenderSpec.scala @@ -38,12 +38,12 @@ class PushKitSenderSpec override def afterAll() = TestKit.shutdownActorSystem(system) - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 2.seconds, interval = 50.millis) implicit val executionContext: ExecutionContext = system.dispatcher - implicit val config = HmsSettings() + implicit val config: HmsSettings = HmsSettings() "HmsSender" should { diff --git a/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala b/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala index b4527e1ca8..4001c1af06 100644 --- a/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala +++ b/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala @@ -28,7 +28,7 @@ import scala.concurrent.Future class PushKitExamples { - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() //#simple-send val config = HmsSettings() diff --git a/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala b/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala index 35c423bb87..e48693ea4a 100644 --- a/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala +++ b/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala @@ -33,7 +33,7 @@ class FlowSpec with ScalaFutures with LogCapturing { - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() final val DatabaseName = this.getClass.getSimpleName diff --git a/influxdb/src/test/scala/docs/scaladsl/InfluxDbSourceSpec.scala b/influxdb/src/test/scala/docs/scaladsl/InfluxDbSourceSpec.scala index f1320cc76b..76b2ec218e 100644 --- a/influxdb/src/test/scala/docs/scaladsl/InfluxDbSourceSpec.scala +++ b/influxdb/src/test/scala/docs/scaladsl/InfluxDbSourceSpec.scala @@ -29,7 +29,7 @@ class InfluxDbSourceSpec final val DatabaseName = "InfluxDbSourceSpec" - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() implicit var influxDB: InfluxDB = _ diff --git a/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala b/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala index 3057494005..ce5e1dbb9e 100644 --- a/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala +++ b/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala @@ -31,7 +31,7 @@ class InfluxDbSpec with ScalaFutures with LogCapturing { - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() final val DatabaseName = this.getClass.getSimpleName diff --git a/ironmq/src/main/scala/akka/stream/alpakka/ironmq/impl/IronMqPullStage.scala b/ironmq/src/main/scala/akka/stream/alpakka/ironmq/impl/IronMqPullStage.scala index 6754f1d54d..aeffea7b06 100644 --- a/ironmq/src/main/scala/akka/stream/alpakka/ironmq/impl/IronMqPullStage.scala +++ b/ironmq/src/main/scala/akka/stream/alpakka/ironmq/impl/IronMqPullStage.scala @@ -12,7 +12,7 @@ import akka.stream.alpakka.ironmq._ import akka.stream.alpakka.ironmq.scaladsl.CommittableMessage import akka.stream.stage._ -import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @InternalApi @@ -52,7 +52,7 @@ private[ironmq] final class IronMqPullStage(queueName: String, settings: IronMqS override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with StageLogging { - implicit def ec: ExecutionContextExecutor = materializer.executionContext + implicit def ec: ExecutionContext = materializer.executionContext // This flag avoid run concurrent fetch from IronMQ private var fetching: Boolean = false diff --git a/ironmq/src/test/scala/akka/stream/alpakka/ironmq/IronMqSpec.scala b/ironmq/src/test/scala/akka/stream/alpakka/ironmq/IronMqSpec.scala index 2b894ed5d9..89e9bfd0da 100644 --- a/ironmq/src/test/scala/akka/stream/alpakka/ironmq/IronMqSpec.scala +++ b/ironmq/src/test/scala/akka/stream/alpakka/ironmq/IronMqSpec.scala @@ -4,21 +4,20 @@ package akka.stream.alpakka.ironmq -import java.util.UUID - import akka.actor.ActorSystem +import akka.stream.Materializer import akka.stream.alpakka.ironmq.impl.IronMqClient import akka.stream.alpakka.testkit.scaladsl.LogCapturing -import akka.stream.Materializer import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest.concurrent.ScalaFutures import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec -import scala.concurrent.{Await, ExecutionContext} +import java.util.UUID import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext} import scala.util.hashing.MurmurHash3 -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec abstract class IronMqSpec extends AnyWordSpec @@ -30,7 +29,7 @@ abstract class IronMqSpec override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 15.seconds, interval = 1.second) val DefaultActorSystemTerminateTimeout: Duration = 10.seconds - private implicit val ec = ExecutionContext.global + private implicit val ec: ExecutionContext = ExecutionContext.global private var mutableIronMqClient = Option.empty[IronMqClient] private var mutableConfig = Option.empty[Config] diff --git a/jms/src/test/scala/akka/stream/alpakka/jms/impl/SoftReferenceCacheSpec.scala b/jms/src/test/scala/akka/stream/alpakka/jms/impl/SoftReferenceCacheSpec.scala index cd28a0b32f..cbaa47f9a5 100644 --- a/jms/src/test/scala/akka/stream/alpakka/jms/impl/SoftReferenceCacheSpec.scala +++ b/jms/src/test/scala/akka/stream/alpakka/jms/impl/SoftReferenceCacheSpec.scala @@ -3,14 +3,13 @@ */ package akka.stream.alpakka.jms.impl -import java.util.concurrent.Executors -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} - -import org.scalatest.wordspec.AnyWordSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} +import java.util.concurrent.Executors +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} class SoftReferenceCacheSpec extends AnyWordSpec with Matchers { diff --git a/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsAckConnectorsSpec.scala b/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsAckConnectorsSpec.scala index 9c921256e9..af77224d97 100644 --- a/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsAckConnectorsSpec.scala +++ b/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsAckConnectorsSpec.scala @@ -23,7 +23,7 @@ import scala.util.{Failure, Success} class JmsAckConnectorsSpec extends JmsSpec { - override implicit val patienceConfig = PatienceConfig(2.minutes) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(2.minutes) "The JMS Ack Connectors" should { "publish and consume strings through a queue" in withConnectionFactory() { connectionFactory => diff --git a/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala b/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala index ebc78f2762..813ef0af7f 100644 --- a/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala +++ b/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala @@ -25,7 +25,7 @@ import scala.util.{Failure, Success} class JmsBufferedAckConnectorsSpec extends JmsSharedServerSpec { - override implicit val patienceConfig = PatienceConfig(2.minutes) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(2.minutes) "The JMS Ack Connectors" should { "publish and consume strings through a queue" in withConnectionFactory() { connectionFactory => diff --git a/jms/src/test/scala/docs/scaladsl/JmsIbmmqConnectorsSpec.scala b/jms/src/test/scala/docs/scaladsl/JmsIbmmqConnectorsSpec.scala index a519147252..13935cc22b 100644 --- a/jms/src/test/scala/docs/scaladsl/JmsIbmmqConnectorsSpec.scala +++ b/jms/src/test/scala/docs/scaladsl/JmsIbmmqConnectorsSpec.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration._ import scala.concurrent.Future class JmsIbmmqConnectorsSpec extends JmsSpec { - override implicit val patienceConfig = PatienceConfig(2.minutes) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(2.minutes) "The JMS Ibmmq Connectors" should { val queueConnectionFactory = { diff --git a/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala b/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala index 0ceaa7e378..6059a1b5ab 100644 --- a/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala +++ b/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala @@ -25,7 +25,7 @@ class JmsTxConnectorsSpec extends JmsSharedServerSpec { private final val log = LoggerFactory.getLogger(classOf[JmsTxConnectorsSpec]) - override implicit val patienceConfig = PatienceConfig(2.minutes) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(2.minutes) "The JMS Transactional Connectors" should { "publish and consume strings through a queue" in withConnectionFactory() { connectionFactory => diff --git a/kinesis/src/test/scala/akka/stream/alpakka/kinesis/DefaultTestContext.scala b/kinesis/src/test/scala/akka/stream/alpakka/kinesis/DefaultTestContext.scala index 6d1af74266..cf1a0f2ede 100644 --- a/kinesis/src/test/scala/akka/stream/alpakka/kinesis/DefaultTestContext.scala +++ b/kinesis/src/test/scala/akka/stream/alpakka/kinesis/DefaultTestContext.scala @@ -4,14 +4,13 @@ package akka.stream.alpakka.kinesis -import java.util.concurrent.{Executors, TimeoutException} - import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite} +import java.util.concurrent.{Executors, TimeoutException} import scala.concurrent.duration._ -import scala.concurrent.{blocking, Await, ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.{blocking, Await, ExecutionContext} trait DefaultTestContext extends BeforeAndAfterAll with BeforeAndAfterEach { this: Suite => @@ -23,7 +22,7 @@ trait DefaultTestContext extends BeforeAndAfterAll with BeforeAndAfterEach { thi """) ) private val threadPool = Executors.newFixedThreadPool(10) - implicit protected val executionContext: ExecutionContextExecutor = + implicit protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor(threadPool) override protected def afterAll(): Unit = { diff --git a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala index 557001cfef..76fb0e1bf9 100644 --- a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala +++ b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala @@ -39,7 +39,7 @@ class MongoSinkSpec val codecRegistry = fromRegistries(fromProviders(classOf[Number], classOf[DomainObject]), DEFAULT_CODEC_REGISTRY) - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 10.seconds, interval = 100.millis) diff --git a/mongodb/src/test/scala/docs/scaladsl/MongoSourceSpec.scala b/mongodb/src/test/scala/docs/scaladsl/MongoSourceSpec.scala index 1cd977f92c..a41c445603 100644 --- a/mongodb/src/test/scala/docs/scaladsl/MongoSourceSpec.scala +++ b/mongodb/src/test/scala/docs/scaladsl/MongoSourceSpec.scala @@ -31,7 +31,7 @@ class MongoSourceSpec with LogCapturing { // #init-system - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() // #init-system override implicit val patienceConfig: PatienceConfig = diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala index 4b5c29ddd6..cb148982cd 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala @@ -4,6 +4,7 @@ package docs.scaladsl +import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.stream.alpakka.mqtt.streaming.MqttSessionSettings import akka.stream.alpakka.mqtt.streaming.scaladsl.{ActorMqttClientSession, ActorMqttServerSession} @@ -11,7 +12,8 @@ import org.scalatest.wordspec.AnyWordSpec class MqttTypedActorSystemSpec extends AnyWordSpec { - implicit val actorSystem = akka.actor.typed.ActorSystem(Behaviors.ignore, "MqttTypedActorSystemSpec") + implicit val actorSystem: ActorSystem[Nothing] = + akka.actor.typed.ActorSystem(Behaviors.ignore, "MqttTypedActorSystemSpec") "A typed actor system" should { "allow client creation" in { @@ -31,7 +33,7 @@ class MqttTypedActorSystemSpec extends AnyWordSpec { class MqttClassicActorSystemSpec extends AnyWordSpec { - implicit val actorSystem = akka.actor.ActorSystem("MqttClassicActorSystemSpec") + implicit val actorSystem: akka.actor.ActorSystem = akka.actor.ActorSystem("MqttClassicActorSystemSpec") "A typed actor system" should { "allow client creation" in { diff --git a/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala b/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala index 28720d5d21..d6c4df4ec1 100644 --- a/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala +++ b/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala @@ -8,6 +8,8 @@ import akka.actor.ActorSystem import akka.stream.alpakka.pravega.{ PravegaEvent, ReaderSettingsBuilder, + TableReaderSettingsBuilder, + TableWriterSettings, TableWriterSettingsBuilder, WriterSettingsBuilder } @@ -17,21 +19,21 @@ import io.pravega.client.stream.Serializer import io.pravega.client.stream.impl.UTF8StringSerializer import java.nio.ByteBuffer -import akka.stream.alpakka.pravega.TableReaderSettingsBuilder import akka.stream.alpakka.pravega.scaladsl.PravegaTable import akka.stream.alpakka.pravega.scaladsl.Pravega + import scala.util.Using import io.pravega.client.tables.TableKey class PravegaReadWriteDocs { - implicit val system = ActorSystem("PravegaDocs") + implicit val system: ActorSystem = ActorSystem("PravegaDocs") val serializer = new UTF8StringSerializer implicit def personSerialiser: Serializer[Person] = ??? - implicit val intSerializer = new Serializer[Int] { + implicit val intSerializer: Serializer[Int] = new Serializer[Int] { override def serialize(value: Int): ByteBuffer = { val buff = ByteBuffer.allocate(4).putInt(value) buff.position(0) @@ -89,7 +91,7 @@ class PravegaReadWriteDocs { } - implicit val tablewriterSettings = TableWriterSettingsBuilder[Int, Person]() + implicit val tablewriterSettings: TableWriterSettings[Int, Person] = TableWriterSettingsBuilder[Int, Person]() .withKeyExtractor(id => new TableKey(intSerializer.serialize(id))) .build() diff --git a/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala b/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala index efccaad1c0..19903c4d1b 100644 --- a/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala +++ b/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala @@ -22,9 +22,9 @@ import io.pravega.client.tables.TableKey class PravegaSettingsSpec extends PravegaBaseSpec with Matchers { - implicit val serializer = new UTF8StringSerializer + implicit val serializer: UTF8StringSerializer = new UTF8StringSerializer - implicit val intSerializer = new Serializer[Int] { + implicit val intSerializer: Serializer[Int] = new Serializer[Int] { override def serialize(value: Int): ByteBuffer = { val buff = ByteBuffer.allocate(4).putInt(value) buff.position(0) diff --git a/pravega/src/test/scala/docs/scaladsl/Serializers.scala b/pravega/src/test/scala/docs/scaladsl/Serializers.scala index 986f8d97ee..c6918e4d87 100644 --- a/pravega/src/test/scala/docs/scaladsl/Serializers.scala +++ b/pravega/src/test/scala/docs/scaladsl/Serializers.scala @@ -10,9 +10,9 @@ import io.pravega.client.stream.impl.UTF8StringSerializer object Serializers { - implicit val stringSerializer = new UTF8StringSerializer() + implicit val stringSerializer: UTF8StringSerializer = new UTF8StringSerializer() - implicit val personSerializer = new Serializer[Person] { + implicit val personSerializer: Serializer[Person] = new Serializer[Person] { def serialize(x: Person): ByteBuffer = { val name = x.firstname.getBytes("UTF-8") val buff = ByteBuffer.allocate(4 + name.length).putInt(x.id) @@ -29,7 +29,7 @@ object Serializers { } - implicit val intSerializer = new Serializer[Int] { + implicit val intSerializer: Serializer[Int] = new Serializer[Int] { override def serialize(value: Int): ByteBuffer = { val buff = ByteBuffer.allocate(4).putInt(value) buff.position(0) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8f0148ae21..35acdd28e1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,8 +6,8 @@ object Dependencies { val CronBuild = sys.env.get("GITHUB_EVENT_NAME").contains("schedule") val Scala213 = "2.13.10" // update even in link-validator.conf - val Scala212 = "2.12.17" - val Scala3 = "3.2.2" + val Scala212 = "2.12.18" + val Scala3 = "3.3.1" val Scala2Versions = Seq(Scala213, Scala212) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 diff --git a/reference/src/main/scala/akka/stream/alpakka/reference/Resource.scala b/reference/src/main/scala/akka/stream/alpakka/reference/Resource.scala index 54dca1f499..6b60d6ea33 100644 --- a/reference/src/main/scala/akka/stream/alpakka/reference/Resource.scala +++ b/reference/src/main/scala/akka/stream/alpakka/reference/Resource.scala @@ -105,7 +105,7 @@ object ResourceSettings { * there is only one instance of the resource instantiated per Actor System. */ final class ResourceExt private (sys: ExtendedActorSystem) extends Extension { - implicit val resource = Resource(ResourceSettings()(sys)) + implicit val resource: Resource = Resource(ResourceSettings()(sys)) sys.registerOnTermination(resource.cleanup()) } diff --git a/slick/src/test/scala/docs/scaladsl/DocSnippets.scala b/slick/src/test/scala/docs/scaladsl/DocSnippets.scala index dba29b41e6..ba21295d77 100644 --- a/slick/src/test/scala/docs/scaladsl/DocSnippets.scala +++ b/slick/src/test/scala/docs/scaladsl/DocSnippets.scala @@ -6,6 +6,8 @@ package docs.scaladsl import akka.Done import akka.actor.ActorSystem + +import scala.concurrent.ExecutionContext //#important-imports import akka.stream.alpakka.slick.scaladsl._ import akka.stream.scaladsl._ @@ -15,11 +17,11 @@ import slick.jdbc.GetResult import scala.concurrent.Future object SlickSourceWithPlainSQLQueryExample extends App { - implicit val system = ActorSystem() - implicit val ec = system.dispatcher + implicit val system: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = system.dispatcher //#source-example - implicit val session = SlickSession.forConfig("slick-h2") + implicit val session: SlickSession = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain @@ -29,7 +31,7 @@ object SlickSourceWithPlainSQLQueryExample extends App { // into instances of the User class. // Please import slick.jdbc.GetResult // See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#result-sets" - implicit val getUserResult = GetResult(r => User(r.nextInt(), r.nextString())) + implicit val getUserResult: GetResult[User] = GetResult(r => User(r.nextInt(), r.nextString())) // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. @@ -51,11 +53,11 @@ object SlickSourceWithPlainSQLQueryExample extends App { } object SlickSourceWithTypedQueryExample extends App { - implicit val system = ActorSystem() - implicit val ec = system.dispatcher + implicit val system: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = system.dispatcher //#source-with-typed-query - implicit val session = SlickSession.forConfig("slick-h2") + implicit val session: SlickSession = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // This import brings everything you need into scope @@ -63,9 +65,9 @@ object SlickSourceWithTypedQueryExample extends App { // The example domain class Users(tag: Tag) extends Table[(Int, String)](tag, "ALPAKKA_SLICK_SCALADSL_TEST_USERS") { - def id = column[Int]("ID") - def name = column[String]("NAME") - def * = (id, name) + def id: Rep[Int] = column[Int]("ID") + def name: Rep[String] = column[String]("NAME") + override def * = (id, name) } // Stream the results of a query @@ -83,11 +85,11 @@ object SlickSourceWithTypedQueryExample extends App { } object SlickSinkExample extends App { - implicit val system = ActorSystem() - implicit val ec = system.dispatcher + implicit val system: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = system.dispatcher //#sink-example - implicit val session = SlickSession.forConfig("slick-h2") + implicit val session: SlickSession = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain @@ -115,11 +117,11 @@ object SlickSinkExample extends App { } object SlickFlowExample extends App { - implicit val system = ActorSystem() - implicit val ec = system.dispatcher + implicit val system: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = system.dispatcher //#flow-example - implicit val session = SlickSession.forConfig("slick-h2") + implicit val session: SlickSession = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain @@ -159,11 +161,11 @@ object SlickFlowWithPassThroughExample extends App { def map[B](f: A => B): KafkaMessage[B] = KafkaMessage(f(msg), offset) } - implicit val system = ActorSystem() - implicit val ec = system.dispatcher + implicit val system: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = system.dispatcher //#flowWithPassThrough-example - implicit val session = SlickSession.forConfig("slick-h2") + implicit val session: SlickSession = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain diff --git a/slick/src/test/scala/docs/scaladsl/SlickSpec.scala b/slick/src/test/scala/docs/scaladsl/SlickSpec.scala index 5f66dd5560..f7373b310d 100644 --- a/slick/src/test/scala/docs/scaladsl/SlickSpec.scala +++ b/slick/src/test/scala/docs/scaladsl/SlickSpec.scala @@ -12,14 +12,14 @@ import akka.stream.scaladsl._ import akka.testkit.TestKit import org.scalatest._ import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.must.Matchers +import org.scalatest.wordspec.AnyWordSpec import slick.basic.DatabaseConfig import slick.dbio.DBIOAction import slick.jdbc.{GetResult, JdbcProfile} import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import org.scalatest.matchers.must.Matchers -import org.scalatest.wordspec.AnyWordSpec +import scala.concurrent.{Await, ExecutionContext, Future} /** * This unit test is run using a local H2 database using @@ -33,7 +33,7 @@ class SlickSpec with Matchers with LogCapturing { //#init-mat - implicit val system = ActorSystem() + implicit val system: ActorSystem = ActorSystem() //#init-mat //#init-session @@ -49,9 +49,9 @@ class SlickSpec def * = (id, name) } - implicit val ec = system.dispatcher - implicit val defaultPatience = PatienceConfig(timeout = 3.seconds, interval = 50.millis) - implicit val getUserResult = GetResult(r => User(r.nextInt(), r.nextString())) + implicit val ec: ExecutionContext = system.dispatcher + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 3.seconds, interval = 50.millis) + implicit val getUserResult: GetResult[User] = GetResult(r => User(r.nextInt(), r.nextString())) val users = (1 to 40).map(i => User(i, s"Name$i")).toSet @@ -154,7 +154,7 @@ class SlickSpec "insert 40 records into a table (no parallelism)" in { //#init-db-config-session val databaseConfig = DatabaseConfig.forConfig[JdbcProfile]("slick-h2") - implicit val session = SlickSession.forConfig(databaseConfig) + implicit val session: SlickSession = SlickSession.forConfig(databaseConfig) //#init-db-config-session val inserted = Source(users) diff --git a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala index 33ef9a82d7..cd5abda96a 100644 --- a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala +++ b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala @@ -38,7 +38,7 @@ class SqsSourceSpec extends AnyFlatSpec with ScalaFutures with Matchers with Def import SqsSourceSpec._ - implicit override val patienceConfig = PatienceConfig(timeout = 10.seconds, interval = 100.millis) + implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = 10.seconds, interval = 100.millis) trait IntegrationFixture { val queueUrl: String = randomQueueUrl() diff --git a/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala b/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala index 6ce0bde3d2..6d7d9521d3 100644 --- a/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala +++ b/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala @@ -4,8 +4,6 @@ package docs.scaladsl -import java.net.InetSocketAddress -import java.nio.charset.StandardCharsets.UTF_8 import akka.actor.{Actor, ActorLogging, ActorSystem, Props, Status} import akka.http.scaladsl.coding.Coders import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling @@ -14,15 +12,17 @@ import akka.http.scaladsl.model.StatusCodes.BadRequest import akka.http.scaladsl.model.headers.`Last-Event-ID` import akka.http.scaladsl.server.{Directives, Route} import akka.pattern.pipe -import akka.stream.scaladsl.{Sink, Source} import akka.stream.ThrottleMode +import akka.stream.scaladsl.{Sink, Source} import akka.testkit.SocketUtil import akka.{Done, NotUsed} import org.scalatest.BeforeAndAfterAll +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets.UTF_8 import scala.collection.immutable import scala.concurrent.duration.DurationInt -import scala.concurrent.{Await, Future} +import scala.concurrent.{Await, ExecutionContext, Future} //#event-source import akka.http.scaladsl.Http import akka.http.scaladsl.model.sse.ServerSentEvent @@ -83,11 +83,11 @@ object EventSourceSpec { import Server._ import context.dispatcher - private implicit val sys = context.system + private implicit val sys: ActorSystem = context.system context.system.scheduler.scheduleOnce(1.second, self, Bind) - override def receive = unbound + override def receive: Receive = unbound private def unbound: Receive = { case Bind => @@ -139,8 +139,8 @@ object EventSourceSpec { final class EventSourceSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll { import EventSourceSpec._ - private implicit val system = ActorSystem() - private implicit val ec = system.dispatcher + private implicit val system: ActorSystem = ActorSystem() + private implicit val ec: ExecutionContext = system.dispatcher "EventSource" should { "communicate correctly with an instable HTTP server" in { @@ -168,7 +168,7 @@ final class EventSourceSpec extends AsyncWordSpec with Matchers with BeforeAndAf .runWith(Sink.seq) //#consume-events - val expected = Seq.tabulate(nrOfSamples)(_ + 3).map(toServerSentEvent(true)) + val expected = Seq.tabulate(nrOfSamples)(_ + 3).map(toServerSentEvent(setEventId = true)) events.map(_ shouldBe expected).andThen { case _ => system.stop(server) } } @@ -178,7 +178,7 @@ final class EventSourceSpec extends AsyncWordSpec with Matchers with BeforeAndAf val server = system.actorOf(Props(new Server(host, port, 2))) val eventSource = EventSource(Uri(s"http://$host:$port"), send, Some("2"), 1.second) val events = eventSource.take(nrOfSamples).runWith(Sink.seq) - val expected = Seq.tabulate(nrOfSamples)(_ % 2 + 3).map(toServerSentEvent(false)) + val expected = Seq.tabulate(nrOfSamples)(_ % 2 + 3).map(toServerSentEvent(setEventId = false)) events.map(_ shouldBe expected).andThen { case _ => system.stop(server) } } @@ -190,15 +190,15 @@ final class EventSourceSpec extends AsyncWordSpec with Matchers with BeforeAndAf val eventSource = EventSource(Uri(s"http://$host:$port/gzipped"), send, None, 1.second) val events = eventSource.take(nrOfSamples).runWith(Sink.seq) - val expected = Seq.tabulate(nrOfSamples)(_ + 1).map(toServerSentEvent(true)) + val expected = Seq.tabulate(nrOfSamples)(_ + 1).map(toServerSentEvent(setEventId = true)) events.map(_ shouldBe expected).andThen { case _ => system.stop(server) } } } - override protected def afterAll() = { + override protected def afterAll(): Unit = { Await.ready(system.terminate(), 42.seconds) super.afterAll() } - private def send(request: HttpRequest) = Http().singleRequest(request) + private def send(request: HttpRequest): Future[HttpResponse] = Http().singleRequest(request) } diff --git a/text/src/test/scala/akka/stream/alpakka/text/scaladsl/CharsetCodingFlowsSpec.scala b/text/src/test/scala/akka/stream/alpakka/text/scaladsl/CharsetCodingFlowsSpec.scala index de21e23e19..a95e48dead 100644 --- a/text/src/test/scala/akka/stream/alpakka/text/scaladsl/CharsetCodingFlowsSpec.scala +++ b/text/src/test/scala/akka/stream/alpakka/text/scaladsl/CharsetCodingFlowsSpec.scala @@ -4,9 +4,6 @@ package akka.stream.alpakka.text.scaladsl -import java.nio.charset.{Charset, StandardCharsets, UnmappableCharacterException} -import java.nio.file.Paths - import akka.actor.ActorSystem import akka.stream.IOResult import akka.stream.alpakka.testkit.scaladsl.LogCapturing @@ -19,9 +16,11 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.{BeforeAndAfterAll, RecoverMethods} +import java.nio.charset.{Charset, StandardCharsets, UnmappableCharacterException} +import java.nio.file.Paths import scala.collection.immutable import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.concurrent.{ExecutionContext, Future} class CharsetCodingFlowsSpec extends TestKit(ActorSystem("charset")) @@ -32,7 +31,7 @@ class CharsetCodingFlowsSpec with RecoverMethods with LogCapturing { - private implicit val executionContext: ExecutionContextExecutor = system.dispatcher + private implicit val executionContext: ExecutionContext = system.dispatcher val multiByteChars = "äåû經濟商行政管理总局التجارى" @@ -60,10 +59,10 @@ class CharsetCodingFlowsSpec } "be illustrated in a documentation example" in { - import java.nio.charset.StandardCharsets - import akka.stream.scaladsl.FileIO + import java.nio.charset.StandardCharsets + // #encoding import scala.collection.JavaConverters._ val targetFile = Paths.get("target/outdata.txt") @@ -98,10 +97,10 @@ class CharsetCodingFlowsSpec "Transcoding" should { "be illustrated in a documentation example" in { - import java.nio.charset.StandardCharsets - import akka.stream.scaladsl.FileIO + import java.nio.charset.StandardCharsets + val utf16bytes = ByteString("äåûßêëé", StandardCharsets.UTF_16) val targetFile = Paths.get("target/outdata-transcoding.txt") val byteStringSource: Source[ByteString, _] = diff --git a/udp/src/test/scala/docs/scaladsl/UdpSpec.scala b/udp/src/test/scala/docs/scaladsl/UdpSpec.scala index a896aa4c75..dc884aa180 100644 --- a/udp/src/test/scala/docs/scaladsl/UdpSpec.scala +++ b/udp/src/test/scala/docs/scaladsl/UdpSpec.scala @@ -31,8 +31,8 @@ class UdpSpec with BeforeAndAfterAll with LogCapturing { - implicit val mat = Materializer(system) - implicit val pat = PatienceConfig(3.seconds, 50.millis) + implicit val mat: Materializer = Materializer(system) + implicit val pat: PatienceConfig = PatienceConfig(3.seconds, 50.millis) // #bind-address val bindToLocal = new InetSocketAddress("localhost", 0)