From d87547174433763b65fc90f2998371dc416b30d2 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Mon, 18 Sep 2023 22:00:26 +0200 Subject: [PATCH 1/3] Enable Scala 3 on more connectors --- .../storagequeue/javadsl/AzureQueueSink.scala | 2 +- build.sbt | 27 +++++++++------- .../ElasticsearchConnectorBehaviour.scala | 6 ++-- .../scaladsl/ElasticsearchSpecUtils.scala | 2 +- .../docs/scaladsl/ElasticsearchV5Spec.scala | 18 +++++------ .../docs/scaladsl/ElasticsearchV7Spec.scala | 18 +++++------ .../OpensearchConnectorBehaviour.scala | 6 ++-- .../docs/scaladsl/OpensearchV1Spec.scala | 18 +++++------ .../alpakka/file/javadsl/LogRotatorSink.scala | 1 + .../docs/scaladsl/LogRotatorSinkSpec.scala | 4 +-- .../scala/docs/scaladsl/HdfsWriterSpec.scala | 2 +- .../test/scala/docs/scaladsl/FlowSpec.scala | 4 +-- .../scala/docs/scaladsl/InfluxDbSpec.scala | 2 +- .../alpakka/json/impl/JsonStreamReader.scala | 2 +- .../docs/scaladsl/PravegaReadWriteDocs.scala | 2 +- project/Dependencies.scala | 4 +-- .../test/scala/docs/scaladsl/SolrSpec.scala | 32 +++++++++---------- 17 files changed, 77 insertions(+), 73 deletions(-) diff --git a/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/javadsl/AzureQueueSink.scala b/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/javadsl/AzureQueueSink.scala index d7265adf36..d54a2dafc1 100644 --- a/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/javadsl/AzureQueueSink.scala +++ b/azure-storage-queue/src/main/scala/akka/stream/alpakka/azure/storagequeue/javadsl/AzureQueueSink.scala @@ -42,7 +42,7 @@ object AzureQueueWithTimeoutsSink { */ def create(cloudQueue: Supplier[CloudQueue]): Sink[MessageWithTimeouts, CompletionStage[Done]] = AzureQueueSink.fromFunction( - { input: MessageWithTimeouts => + { (input: MessageWithTimeouts) => AzureQueueSinkFunctions .addMessage(() => cloudQueue.get)(input.message, input.timeToLive, input.initialVisibility) } diff --git a/build.sbt b/build.sbt index efee688f38..7690218790 100644 --- a/build.sbt +++ b/build.sbt @@ -132,7 +132,8 @@ lazy val awslambda = alpakkaProject("awslambda", "aws.lambda", Dependencies.AwsL lazy val azureStorageQueue = alpakkaProject( "azure-storage-queue", "azure.storagequeue", - Dependencies.AzureStorageQueue + Dependencies.AzureStorageQueue, + Scala3.settings ) lazy val cassandra = @@ -157,7 +158,7 @@ lazy val elasticsearch = alpakkaProject( ) // The name 'file' is taken by `sbt.file`, hence 'files' -lazy val files = alpakkaProject("file", "file", Dependencies.File) +lazy val files = alpakkaProject("file", "file", Dependencies.File, Scala3.settings) lazy val ftp = alpakkaProject( "ftp", @@ -245,9 +246,9 @@ lazy val googleCloudStorage = alpakkaProject( lazy val googleFcm = alpakkaProject("google-fcm", "google.firebase.fcm", Dependencies.GoogleFcm, Test / fork := true) .dependsOn(googleCommon) -lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, Test / fork := true) +lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, Scala3.settings, Test / fork := true) -lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs) +lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs, Scala3.settings) lazy val huaweiPushKit = alpakkaProject("huawei-push-kit", "huawei.pushkit", Dependencies.HuaweiPushKit) @@ -256,6 +257,7 @@ lazy val influxdb = alpakkaProject( "influxdb", "influxdb", Dependencies.InfluxDB, + Scala3.settings, Compile / scalacOptions ++= Seq( // JDK 11: method isAccessible in class AccessibleObject is deprecated "-Wconf:cat=deprecation:s" @@ -271,15 +273,15 @@ lazy val ironmq = alpakkaProject( lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, Scala3.settings) -lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming) +lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming, Scala3.settings) lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis).settings(Scala3.settings) -lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu) +lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu, Scala3.settings) lazy val mongodb = alpakkaProject("mongodb", "mongodb", Dependencies.MongoDb) -lazy val mqtt = alpakkaProject("mqtt", "mqtt", Dependencies.Mqtt) +lazy val mqtt = alpakkaProject("mqtt", "mqtt", Dependencies.Mqtt, Scala3.settings) lazy val mqttStreaming = alpakkaProject("mqtt-streaming", "mqttStreaming", Dependencies.MqttStreaming) @@ -306,6 +308,7 @@ lazy val pravega = alpakkaProject( "pravega", "pravega", Dependencies.Pravega, + Scala3.settings, Test / fork := true ) @@ -315,7 +318,7 @@ lazy val springWeb = alpakkaProject( Dependencies.SpringWeb ) -lazy val simpleCodecs = alpakkaProject("simple-codecs", "simplecodecs") +lazy val simpleCodecs = alpakkaProject("simple-codecs", "simplecodecs", Scala3.settings) lazy val slick = alpakkaProject("slick", "slick", Dependencies.Slick) @@ -324,20 +327,20 @@ lazy val eventbridge = lazy val sns = alpakkaProject("sns", "aws.sns", Dependencies.Sns).settings(Scala3.settings) -lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr) +lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr, Scala3.settings) lazy val sqs = alpakkaProject("sqs", "aws.sqs", Dependencies.Sqs).settings(Scala3.settings) -lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse) +lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse, Scala3.settings) -lazy val text = alpakkaProject("text", "text") +lazy val text = alpakkaProject("text", "text", Scala3.settings) lazy val udp = alpakkaProject("udp", "udp") lazy val unixdomainsocket = alpakkaProject("unix-domain-socket", "unixdomainsocket", Dependencies.UnixDomainSocket) -lazy val xml = alpakkaProject("xml", "xml", Dependencies.Xml) +lazy val xml = alpakkaProject("xml", "xml", Dependencies.Xml, Scala3.settings) // Java Platform version for JavaDoc creation val JavaDocLinkVersion = "11" diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala index b497ed6209..ce0dfc359d 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala @@ -378,7 +378,7 @@ trait ElasticsearchConnectorBehaviour { val indexName = "sink7" val createBooks = Source(books) - .map { book: (String, Book) => + .map { (book: (String, Book)) => WriteMessage.createUpsertMessage(id = book._1, source = book._2) } .via( @@ -411,7 +411,7 @@ trait ElasticsearchConnectorBehaviour { // Update sink7/_doc with the second dataset val upserts = Source(updatedBooks) - .map { book: (String, JsObject) => + .map { (book: (String, JsObject)) => WriteMessage.createUpsertMessage(id = book._1, source = book._2) } .via( @@ -483,7 +483,7 @@ trait ElasticsearchConnectorBehaviour { "read and write document-version if configured to do so" in { case class VersionTestDoc(id: String, name: String, value: Int) - implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc) + implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc.apply) val indexName = "version-test-scala" val typeName = "_doc" diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala index 24d58f9c81..33be9299c7 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala @@ -35,7 +35,7 @@ trait ElasticsearchSpecUtils { this: AnyWordSpec with ScalaFutures => case class Book(title: String, shouldSkip: Option[Boolean] = None, price: Int = 10) - implicit val format: JsonFormat[Book] = jsonFormat3(Book) + implicit val format: JsonFormat[Book] = jsonFormat3(Book.apply) //#define-class def register(connectionSettings: ElasticsearchConnectionSettings, diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala index a7c88b642b..01136db495 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala @@ -56,7 +56,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[spray.json.JsObject] => + .map { (message: ReadResult[spray.json.JsObject]) => val book: Book = jsonReader[Book].read(message.source) WriteMessage.createIndexMessage(message.id, book) } @@ -93,7 +93,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage.createIndexMessage(message.id, message.source) } .runWith( @@ -129,7 +129,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage.createIndexMessage(message.id, message.source) } .via( @@ -209,7 +209,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt val indexName = "sink6" val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -261,7 +261,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt val indexName = "sink6-bulk" val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -316,7 +316,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt val indexName = "sink6-nop" val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -373,7 +373,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -464,7 +464,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage .createIndexMessage(message.id, message.source) .withIndexName(customIndexName) // Setting the index-name to use for this document @@ -500,7 +500,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt case class TestDoc(id: String, a: String, b: Option[String], c: String) //#custom-search-params - implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc) + implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc.apply) val indexName = "custom-search-params-test-scala" val typeName = "_doc" diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala index 32663810da..113a6ecf36 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala @@ -48,7 +48,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[spray.json.JsObject] => + .map { (message: ReadResult[spray.json.JsObject]) => val book: Book = jsonReader[Book].read(message.source) WriteMessage.createIndexMessage(message.id, book) } @@ -84,7 +84,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage.createIndexMessage(message.id, message.source) } .runWith( @@ -119,7 +119,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage.createIndexMessage(message.id, message.source) } .via( @@ -196,7 +196,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt val indexName = "sink6" val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -248,7 +248,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt val indexName = "sink6-bulk" val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -304,7 +304,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt val indexName = "sink6-nop" val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -362,7 +362,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -451,7 +451,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage .createIndexMessage(message.id, message.source) .withIndexName(customIndexName) // Setting the index-name to use for this document @@ -484,7 +484,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt case class TestDoc(id: String, a: String, b: Option[String], c: String) - implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc) + implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc.apply) val indexName = "custom-search-params-test-scala" val typeName = "_doc" diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala index bf8b9e877a..97c6c58f31 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala @@ -378,7 +378,7 @@ trait OpensearchConnectorBehaviour { val indexName = "sink7" val createBooks = Source(books) - .map { book: (String, Book) => + .map { (book: (String, Book)) => WriteMessage.createUpsertMessage(id = book._1, source = book._2) } .via( @@ -411,7 +411,7 @@ trait OpensearchConnectorBehaviour { // Update sink7/_doc with the second dataset val upserts = Source(updatedBooks) - .map { book: (String, JsObject) => + .map { (book: (String, JsObject)) => WriteMessage.createUpsertMessage(id = book._1, source = book._2) } .via( @@ -483,7 +483,7 @@ trait OpensearchConnectorBehaviour { "read and write document-version if configured to do so" in { case class VersionTestDoc(id: String, name: String, value: Int) - implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc) + implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc.apply) val indexName = "version-test-scala" val typeName = "_doc" diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala index 3f6130384c..d135886cc7 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala @@ -57,7 +57,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[spray.json.JsObject] => + .map { (message: ReadResult[spray.json.JsObject]) => val book: Book = jsonReader[Book].read(message.source) WriteMessage.createIndexMessage(message.id, book) } @@ -95,7 +95,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage.createIndexMessage(message.id, message.source) } .runWith( @@ -131,7 +131,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage.createIndexMessage(message.id, message.source) } .via( @@ -212,7 +212,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils val indexName = "sink6" val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -264,7 +264,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils val indexName = "sink6-bulk" val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -320,7 +320,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils val indexName = "sink6-nop" val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -378,7 +378,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book val id = book.title @@ -470,7 +470,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils query = """{"match_all": {}}""", settings = baseSourceSettings ) - .map { message: ReadResult[Book] => + .map { (message: ReadResult[Book]) => WriteMessage .createIndexMessage(message.id, message.source) .withIndexName(customIndexName) // Setting the index-name to use for this document @@ -504,7 +504,7 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils case class TestDoc(id: String, a: String, b: Option[String], c: String) - implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc) + implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc.apply) val indexName = "custom-search-params-test-scala" val typeName = "_doc" diff --git a/file/src/main/scala/akka/stream/alpakka/file/javadsl/LogRotatorSink.scala b/file/src/main/scala/akka/stream/alpakka/file/javadsl/LogRotatorSink.scala index 29148f7724..0680f737d2 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/javadsl/LogRotatorSink.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/javadsl/LogRotatorSink.scala @@ -11,6 +11,7 @@ import java.util.concurrent.CompletionStage import akka.Done import akka.stream.javadsl import akka.stream.scaladsl +import akka.stream.scaladsl.SinkToCompletionStage import akka.stream.javadsl.Sink import akka.util.ByteString import akka.japi.function diff --git a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala index 8e4ea266b8..8d56ccb39b 100644 --- a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala +++ b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala @@ -85,7 +85,7 @@ class LogRotatorSinkSpec "complete when consuming an empty source" in assertAllStagesStopped { val triggerCreator: () => ByteString => Option[Path] = () => { - element: ByteString => fail("trigger creator should not be called") + (element: ByteString) => fail("trigger creator should not be called") } val rotatorSink: Sink[ByteString, Future[Done]] = @@ -102,7 +102,7 @@ class LogRotatorSinkSpec val fileSizeTriggerCreator: () => ByteString => Option[Path] = () => { val max = 10 * 1024 * 1024 var size: Long = max - element: ByteString => + (element: ByteString) => if (size + element.size > max) { val path = Files.createTempFile("out-", ".log") size = element.size diff --git a/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala b/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala index 1ef2122eb8..1718ffc214 100644 --- a/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala +++ b/hdfs/src/test/scala/docs/scaladsl/HdfsWriterSpec.scala @@ -254,7 +254,7 @@ class HdfsWriterSpec committedOffsets = committedOffsets :+ offset val resF = Source(messagesFromKafka) - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book // Transform message so that we can write to hdfs HdfsWriteMessage(ByteString(book.title), kafkaMessage.offset) diff --git a/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala b/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala index e48693ea4a..20b311d445 100644 --- a/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala +++ b/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala @@ -95,7 +95,7 @@ class FlowSpec committedOffsets = committedOffsets :+ offset val f1 = Source(messagesFromKafka) - .map { kafkaMessage: KafkaMessage => + .map { (kafkaMessage: KafkaMessage) => val cpu = kafkaMessage.cpu println("hostname: " + cpu.getHostname) @@ -105,7 +105,7 @@ class FlowSpec .via( InfluxDbFlow.typedWithPassThrough(classOf[InfluxDbFlowCpu]) ) - .map { messages: Seq[InfluxDbWriteResult[InfluxDbFlowCpu, KafkaOffset]] => + .map { (messages: Seq[InfluxDbWriteResult[InfluxDbFlowCpu, KafkaOffset]]) => messages.foreach { message => commitToKafka(message.writeMessage.passThrough) } diff --git a/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala b/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala index ce5e1dbb9e..c48739c24c 100644 --- a/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala +++ b/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala @@ -71,7 +71,7 @@ class InfluxDbSpec //#run-typed val f1 = InfluxDbSource .typed(classOf[InfluxDbSpecCpu], InfluxDbReadSettings(), influxDB, query) - .map { cpu: InfluxDbSpecCpu => + .map { (cpu: InfluxDbSpecCpu) => { val clonedCpu = cpu.cloneAt(cpu.getTime.plusSeconds(60000)) List(InfluxDbWriteMessage(clonedCpu)) diff --git a/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala b/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala index a7d0046731..0315930e0c 100644 --- a/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala +++ b/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala @@ -38,7 +38,7 @@ private[akka] final class JsonStreamReader(path: JsonPath) extends GraphStage[Fl private val config = surfer.configBuilder .bind(path, new JsonPathListener { override def onValue(value: Any, context: ParsingContext): Unit = - buffer = buffer.enqueue(ByteString(value.toString)) + buffer = buffer.enqueueAll(Seq(ByteString(value.toString))) }) .build private val parser = surfer.createNonBlockingParser(config) diff --git a/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala b/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala index d6c4df4ec1..2e7690dad3 100644 --- a/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala +++ b/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala @@ -81,7 +81,7 @@ class PravegaReadWriteDocs { Pravega .source(readerGroup, readerSettings) - .to(Sink.foreach { event: PravegaEvent[String] => + .to(Sink.foreach { (event: PravegaEvent[String]) => val message: String = event.message processMessage(message) }) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 35acdd28e1..9bb8aa2dd1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -294,7 +294,7 @@ object Dependencies { val Hdfs = Seq( libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-client" % HadoopVersion exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), // ApacheV2 - "org.typelevel" %% "cats-core" % "2.0.0", // MIT, + "org.typelevel" %% "cats-core" % "2.10.0", // MIT, "org.apache.hadoop" % "hadoop-hdfs" % HadoopVersion % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), // ApacheV2 "org.apache.hadoop" % "hadoop-common" % HadoopVersion % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), // ApacheV2 "org.apache.hadoop" % "hadoop-minicluster" % HadoopVersion % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), // ApacheV2 @@ -369,7 +369,7 @@ object Dependencies { val MongoDb = Seq( libraryDependencies ++= Seq( - "org.mongodb.scala" %% "mongo-scala-driver" % "4.7.2" // ApacheV2 + "org.mongodb.scala" %% "mongo-scala-driver" % "4.10.0" // ApacheV2 ) ) diff --git a/solr/src/test/scala/docs/scaladsl/SolrSpec.scala b/solr/src/test/scala/docs/scaladsl/SolrSpec.scala index c42cb7be38..1d06b94d43 100644 --- a/solr/src/test/scala/docs/scaladsl/SolrSpec.scala +++ b/solr/src/test/scala/docs/scaladsl/SolrSpec.scala @@ -82,7 +82,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#run-document val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) val doc: SolrInputDocument = bookToDoc(book) WriteMessage.createUpsertMessage(doc) @@ -132,7 +132,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#run-bean val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val title = tuple.getString("title") WriteMessage.createUpsertMessage(BookBean(title)) } @@ -175,7 +175,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#run-typed val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) WriteMessage.createUpsertMessage(book) } @@ -222,7 +222,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca // #typeds-flow val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) WriteMessage.createUpsertMessage(book) } @@ -297,7 +297,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#kafka-example // Note: This code mimics Alpakka Kafka APIs val copyCollection = kafkaConsumerSource - .map { kafkaMessage: CommittableMessage => + .map { (kafkaMessage: CommittableMessage) => val book = kafkaMessage.book // Transform message so that we can write to solr WriteMessage.createUpsertMessage(book).withPassThrough(kafkaMessage.committableOffset) @@ -345,7 +345,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) val doc: SolrInputDocument = bookToDoc(book) WriteMessage.createUpsertMessage(doc) @@ -366,7 +366,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#delete-documents val deleteDocuments = SolrSource .fromTupleStream(stream2) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val id = tuple.getFields.get("title").toString WriteMessage.createDeleteMessage[SolrInputDocument](id) } @@ -400,7 +400,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca val upsertCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) .copy(comment = "Written by good authors.") val doc: SolrInputDocument = bookToDoc(book) @@ -422,7 +422,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#update-atomically-documents val updateCollection = SolrSource .fromTupleStream(stream2) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val id = tuple.getFields.get("title").toString val comment = tuple.getFields.get("comment").toString WriteMessage.createUpdateMessage[SolrInputDocument]( @@ -473,7 +473,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) WriteMessage.createUpsertMessage(book) } @@ -496,7 +496,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca val deleteElements = SolrSource .fromTupleStream(stream2) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val title = tuple.getFields.get("title").toString WriteMessage.createDeleteMessage[Book](title) } @@ -528,7 +528,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple).copy(comment = "Written by good authors.", routerOpt = Some("router-value")) WriteMessage.createUpsertMessage(book) @@ -552,7 +552,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca val updateCollection = SolrSource .fromTupleStream(stream2) - .map { tuple: Tuple => + .map { (tuple: Tuple) => WriteMessage .createUpdateMessage[Book]( idField = "title", @@ -603,7 +603,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca val copyCollection = SolrSource .fromTupleStream(stream) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) val doc: SolrInputDocument = bookToDoc(book) WriteMessage.createUpsertMessage(doc) @@ -624,7 +624,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#delete-documents-query val deleteByQuery = SolrSource .fromTupleStream(stream2) - .map { tuple: Tuple => + .map { (tuple: Tuple) => val title = tuple.getFields.get("title").toString WriteMessage.createDeleteByQueryMessage[SolrInputDocument]( s"""title:"$title" """ @@ -683,7 +683,7 @@ class SolrSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Sca //#kafka-example-PT // Note: This code mimics Alpakka Kafka APIs val copyCollection = kafkaConsumerSource - .map { offset: CommittableOffset => + .map { (offset: CommittableOffset) => // Transform message so that we can write to solr WriteMessage.createPassThrough(offset).withSource(new SolrInputDocument()) } From eded985edf3e679715e951ab52f84015c869998a Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Tue, 19 Sep 2023 08:14:04 +0200 Subject: [PATCH 2/3] Roll back Scala 3 for json-streaming --- build.sbt | 2 +- .../scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 7690218790..736e0c4f59 100644 --- a/build.sbt +++ b/build.sbt @@ -273,7 +273,7 @@ lazy val ironmq = alpakkaProject( lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, Scala3.settings) -lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming, Scala3.settings) +lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming) lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis).settings(Scala3.settings) diff --git a/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala b/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala index 0315930e0c..a7d0046731 100644 --- a/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala +++ b/json-streaming/src/main/scala/akka/stream/alpakka/json/impl/JsonStreamReader.scala @@ -38,7 +38,7 @@ private[akka] final class JsonStreamReader(path: JsonPath) extends GraphStage[Fl private val config = surfer.configBuilder .bind(path, new JsonPathListener { override def onValue(value: Any, context: ParsingContext): Unit = - buffer = buffer.enqueueAll(Seq(ByteString(value.toString))) + buffer = buffer.enqueue(ByteString(value.toString)) }) .build private val parser = surfer.createNonBlockingParser(config) From c32acbe1cc6cd75a952aea467f73a03c3ca9e7a5 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Tue, 19 Sep 2023 08:55:39 +0200 Subject: [PATCH 3/3] Remove unavailable blog post --- .../main/paradox/other-docs/webinars-presentations-articles.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/src/main/paradox/other-docs/webinars-presentations-articles.md b/docs/src/main/paradox/other-docs/webinars-presentations-articles.md index 5e9deeaf7f..5bafa7d994 100644 --- a/docs/src/main/paradox/other-docs/webinars-presentations-articles.md +++ b/docs/src/main/paradox/other-docs/webinars-presentations-articles.md @@ -45,8 +45,6 @@ presentation by Colin Breck, ScalaDays New York, June 2018 [Stream a file to AWS S3 using Akka Streams (via Alpakka) in Play Framework](https://blog.knoldus.com/stream-a-file-to-aws-s3-using-akka-streams-via-alpakka-in-play-framework/) blog by Sidharth Khattri, May 2018 -[Alpakka (Akka Streams) vs Apache Camel: who wins?](http://www.thedevpiece.com/alpakka-akka-streams-vs-apache-camel-who-wins/) Blog by Gabriel Francisco, May 2018 - [Alpakka – a new world of connectors for Reactive Enterprise Integration](https://www.youtube.com/watch?v=EcNZ2mJZmCk) presentation by Jan Pustelnik, Actyx, ReactSphere Kraków, April 2018