Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build with Scala 2.13 #1530

Merged
merged 15 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
name: "Build code style check (fixed with `sbt scalafmtSbt`)"
- env: CMD="++2.11.12 Test/compile"
name: "Compile all tests (with Scala 2.11)"
- env: CMD="++2.13.0-M5 Test/compile"
name: "Compile all tests (with Scala 2.13)"
- env: CMD="unidoc"
name: "Create all API docs"
- env: CMD="docs/Paradox/paradox"
Expand Down
64 changes: 46 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ lazy val alpakka = project
(ScalaUnidoc / unidoc / fullClasspath).value
.filterNot(_.data.getAbsolutePath.contains("protobuf-java-2.5.0.jar"))
},
ScalaUnidoc / unidoc / unidocProjectFilter := inAnyProject -- inProjects(`doc-examples`)
ScalaUnidoc / unidoc / unidocProjectFilter := inAnyProject -- inProjects(`doc-examples`),
crossScalaVersions := List() // workaround for https://github.com/sbt/sbt/issues/3465
)

lazy val amqp = alpakkaProject("amqp", "amqp", Dependencies.Amqp)
Expand All @@ -93,7 +94,7 @@ lazy val cassandra = alpakkaProject("cassandra", "cassandra", Dependencies.Cassa
lazy val couchbase =
alpakkaProject("couchbase", "couchbase", Dependencies.Couchbase, parallelExecution in Test := false)

lazy val csv = alpakkaProject("csv", "csv", Dependencies.Csv)
lazy val csv = alpakkaProject("csv", "csv", Dependencies.Csv, crossScalaVersions -= Dependencies.Scala213)

lazy val dynamodb = alpakkaProject("dynamodb", "aws.dynamodb", Dependencies.DynamoDB)

Expand All @@ -115,11 +116,25 @@ lazy val ftp = alpakkaProject(
parallelExecution in Test := false,
fork in Test := true,
// To avoid potential blocking in machines with low entropy (default is `/dev/random`)
javaOptions in Test += "-Djava.security.egd=file:/dev/./urandom"
javaOptions in Test += "-Djava.security.egd=file:/dev/./urandom",
crossScalaVersions -= Dependencies.Scala213
)

lazy val geode =
alpakkaProject("geode", "geode", Dependencies.Geode, fork in Test := true, parallelExecution in Test := false)
alpakkaProject(
"geode",
"geode",
Dependencies.Geode,
fork in Test := true,
parallelExecution in Test := false,
unmanagedSourceDirectories in Compile ++= {
val sourceDir = (sourceDirectory in Compile).value
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n >= 12 => Seq(sourceDir / "scala-2.12+")
case _ => Seq.empty
}
},
)

lazy val googleCloudPubSub = alpakkaProject(
"google-cloud-pub-sub",
Expand All @@ -141,7 +156,8 @@ lazy val googleCloudPubSubGrpc = alpakkaProject(
javaAgents += Dependencies.GooglePubSubGrpcAlpnAgent % "test",
// for the ExampleApp in the tests
connectInput in run := true,
Compile / compile / scalacOptions += "-P:silencer:pathFilters=src_managed"
Compile / compile / scalacOptions += "-P:silencer:pathFilters=src_managed",
crossScalaVersions -= Dependencies.Scala213
).enablePlugins(AkkaGrpcPlugin, JavaAgent)

lazy val googleFcm = alpakkaProject(
Expand All @@ -155,7 +171,11 @@ lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, fork in Te

lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs, parallelExecution in Test := false)

lazy val ironmq = alpakkaProject("ironmq", "ironmq", Dependencies.IronMq, fork in Test := true)
lazy val ironmq = alpakkaProject("ironmq",
"ironmq",
Dependencies.IronMq,
fork in Test := true,
crossScalaVersions -= Dependencies.Scala213)

lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, parallelExecution in Test := false)

Expand All @@ -169,7 +189,8 @@ lazy val kinesis = alpakkaProject("kinesis",

lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu, fork in Test := false)

lazy val mongodb = alpakkaProject("mongodb", "mongodb", Dependencies.MongoDb)
lazy val mongodb =
alpakkaProject("mongodb", "mongodb", Dependencies.MongoDb, crossScalaVersions -= Dependencies.Scala213)

lazy val mqtt = alpakkaProject("mqtt", "mqtt", Dependencies.Mqtt)

Expand All @@ -193,21 +214,27 @@ lazy val springWeb = alpakkaProject("spring-web", "spring.web", Dependencies.Spr

lazy val simpleCodecs = alpakkaProject("simple-codecs", "simplecodecs")

lazy val slick = alpakkaProject("slick", "slick", Dependencies.Slick)
lazy val slick = alpakkaProject("slick", "slick", Dependencies.Slick, crossScalaVersions -= Dependencies.Scala213)

lazy val sns = alpakkaProject("sns",
"aws.sns",
Dependencies.Sns,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false)
lazy val sns = alpakkaProject(
"sns",
"aws.sns",
Dependencies.Sns,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false,
crossScalaVersions -= Dependencies.Scala213
)

lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr, parallelExecution in Test := false)

lazy val sqs = alpakkaProject("sqs",
"aws.sqs",
Dependencies.Sqs,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false)
lazy val sqs = alpakkaProject(
"sqs",
"aws.sqs",
Dependencies.Sqs,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false,
crossScalaVersions -= Dependencies.Scala213
)

lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse)

Expand Down Expand Up @@ -297,6 +324,7 @@ lazy val `doc-examples` = project
name := s"akka-stream-alpakka-doc-examples",
publish / skip := true,
whitesourceIgnore := true,
crossScalaVersions -= Dependencies.Scala213,
Dependencies.`Doc-examples`
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class ItemSpec extends TestKit(ActorSystem("ItemSpec")) with AsyncWordSpecLike w
"9) get two items in a transaction" ignore assertAllStagesStopped {
DynamoDb.single(transactGetItemsRequest).map { results =>
results.getResponses.size shouldBe 2
val Seq(a, b) = results.getResponses.asScala
a.getItem.get(sortCol) shouldEqual N(0)
b.getItem.get(sortCol) shouldEqual N(1)
val responses = results.getResponses.asScala
responses.head.getItem.get(sortCol) shouldEqual N(0)
responses.last.getItem.get(sortCol) shouldEqual N(1)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.geode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final class HTableSettings[T] private (val conf: Configuration,
* Java Api
*/
def withColumnFamilies(columnFamilies: java.util.List[String]): HTableSettings[T] =
copy(columnFamilies = columnFamilies.asScala.to[immutable.Seq])
copy(columnFamilies = columnFamilies.asScala.toIndexedSeq)

def withConverter(converter: T => immutable.Seq[Mutation]): HTableSettings[T] =
copy(converter = converter)
Expand All @@ -39,7 +39,7 @@ final class HTableSettings[T] private (val conf: Configuration,
* Java Api
*/
def withConverter(converter: java.util.function.Function[T, java.util.List[Mutation]]): HTableSettings[T] =
copy(converter = converter.asScala(_).asScala.to[immutable.Seq])
copy(converter = converter.asScala(_).asScala.toIndexedSeq)

override def toString: String =
"HTableSettings(" +
Expand Down Expand Up @@ -76,8 +76,5 @@ object HTableSettings {
tableName: TableName,
columnFamilies: java.util.List[String],
converter: java.util.function.Function[T, java.util.List[Mutation]]): HTableSettings[T] =
HTableSettings(conf,
tableName,
columnFamilies.asScala.to[immutable.Seq],
converter.asScala(_).asScala.to[immutable.Seq])
HTableSettings(conf, tableName, columnFamilies.asScala.toIndexedSeq, converter.asScala(_).asScala.toIndexedSeq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to have the newlines here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its scalafmt. :)

}
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ object JavaTestUtils extends TestUtils {
assertEquals(getFiles(fs).size(), logs.size())

def readLogs(fs: FileSystem, logs: Sequence[RotationMessage]): Sequence[String] =
ScalaTestUtils.readLogs(fs, logs.asScala).asJava
ScalaTestUtils.readLogs(fs, logs.asScala.toIndexedSeq).asJava

def readLogsWithFlatten(fs: FileSystem, logs: Sequence[RotationMessage]): Sequence[Char] =
ScalaTestUtils.readLogsWithFlatten(fs, logs.asScala).asJava
ScalaTestUtils.readLogsWithFlatten(fs, logs.asScala.toIndexedSeq).asJava

def generateFakeContentWithPartitions(count: Double, bytes: Long, partition: Int): Sequence[ByteString] =
ScalaTestUtils.generateFakeContentWithPartitions(count, bytes, partition).asJava
Expand All @@ -209,8 +209,9 @@ object JavaTestUtils extends TestUtils {
logs: Sequence[RotationMessage],
codec: CompressionCodec): Assertion = {
val pureContent: String = content.asScala.map(_.utf8String).mkString
val contentFromHdfsWithCodec: String = ScalaTestUtils.readLogsWithCodec(fs, logs.asScala, codec).mkString
val contentFromHdfs: String = ScalaTestUtils.readLogs(fs, logs.asScala).mkString
val contentFromHdfsWithCodec: String =
ScalaTestUtils.readLogsWithCodec(fs, logs.asScala.toIndexedSeq, codec).mkString
val contentFromHdfs: String = ScalaTestUtils.readLogs(fs, logs.asScala.toIndexedSeq).mkString
assertNotEquals(contentFromHdfs, pureContent)
assertEquals(contentFromHdfsWithCodec, pureContent)
}
Expand All @@ -225,7 +226,7 @@ object JavaTestUtils extends TestUtils {
ScalaTestUtils.generateFakeContentForSequence(count, bytes).map { case (k, v) => akka.japi.Pair(k, v) }.asJava

def readLogsWithCodec(fs: FileSystem, logs: Sequence[RotationMessage], codec: CompressionCodec): Sequence[String] =
ScalaTestUtils.readLogsWithCodec(fs, logs.asScala, codec).asJava
ScalaTestUtils.readLogsWithCodec(fs, logs.asScala.toIndexedSeq, codec).asJava

def verifyFlattenContent(fs: FileSystem, logs: Sequence[RotationMessage], content: Sequence[ByteString]): Unit =
assertArrayEquals(readLogsWithFlatten(fs, logs).toArray, content.asScala.flatMap(_.utf8String).asJava.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ private final class SoftReferenceCache[K, V <: AnyRef] {
}

private def purgeCache(): Unit =
cache --= cache.collect { case (key, ref) if ref.get.isEmpty => key }.to[Vector]
cache --= cache.collect { case (key, ref) if ref.get.isEmpty => key }.toVector
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import javax.jms.{JMSException, TextMessage}
import org.scalatest.Inspectors._

import scala.annotation.tailrec
import scala.collection.{mutable, SortedSet}
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -280,7 +280,7 @@ class JmsAckConnectorsSpec extends JmsSpec {

killSwitch2.shutdown()

resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"ensure no message loss when aborting a stream" in withServer() { server =>
Expand Down Expand Up @@ -360,7 +360,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
implicit val _ = Ordering.by { s: String =>
s.toInt
}
resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"shutdown when waiting to acknowledge messages" in withServer() { server =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import javax.jms.{JMSException, TextMessage}
import org.scalatest.Inspectors._

import scala.annotation.tailrec
import scala.collection.{immutable, mutable, SortedSet}
import scala.collection.{immutable, mutable}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -287,7 +287,7 @@ class JmsBufferedAckConnectorsSpec extends JmsSpec {
implicit val _ = Ordering.by { s: String =>
s.toInt
}
resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"ensure no message loss when aborting a stream" in withConnectionFactory() { connectionFactory =>
Expand Down Expand Up @@ -372,7 +372,7 @@ class JmsBufferedAckConnectorsSpec extends JmsSpec {
implicit val _ = Ordering.by { s: String =>
s.toInt
}
resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ private[kinesisfirehose] object KinesisFirehoseFlowStage {
result.getRequestResponses.asScala
.zip(request.getRecords.asScala)
.filter(_._1.getErrorCode != null)
.toIndexedSeq
)
} else {
retryRecordsCallback(Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync
import com.amazonaws.services.kinesisfirehose.model.{PutRecordBatchResponseEntry, Record}

import scala.collection.JavaConverters._
import scala.collection.immutable.{Iterable, Queue}
import scala.collection.immutable.Queue
import scala.concurrent.duration._

object KinesisFirehoseFlow {
Expand All @@ -33,7 +33,7 @@ object KinesisFirehoseFlow {
)
)
.mapAsync(settings.parallelism)(identity)
.mapConcat(_.getRequestResponses.asScala.to[Iterable])
.mapConcat(_.getRequestResponses.asScala.toIndexedSeq)
.filter(_.getErrorCode == null)

private def getByteSize(record: Record): Int = record.getData.position
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ final case class Subscribe @InternalApi private[streaming] (packetId: PacketId,
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*/
def this(topicFilters: java.util.List[AkkaPair[String, Integer]]) =
this(PacketId(0), topicFilters.asScala.map(v => v.first -> ControlPacketFlags(v.second)))
this(PacketId(0), topicFilters.asScala.toIndexedSeq.map(v => v.first -> ControlPacketFlags(v.second)))

/**
* A convenience for subscribing to a single topic with at-least-once semantics
Expand All @@ -388,7 +388,7 @@ final case class SubAck(packetId: PacketId, returnCodes: Seq[ControlPacketFlags]
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*/
def this(packetId: PacketId, returnCodes: java.util.List[Integer]) =
this(packetId, returnCodes.asScala.map(v => ControlPacketFlags(v)))
this(packetId, returnCodes.asScala.toIndexedSeq.map(v => ControlPacketFlags(v)))
}

object Unsubscribe {
Expand Down Expand Up @@ -428,7 +428,7 @@ final case class Unsubscribe @InternalApi private[streaming] (packetId: PacketId
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*/
def this(topicFilters: java.util.List[String]) =
this(PacketId(0), topicFilters.asScala)
this(PacketId(0), topicFilters.asScala.toIndexedSeq)

/**
* A convenience for unsubscribing from a single topic
Expand Down
7 changes: 5 additions & 2 deletions project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object Common extends AutoPlugin {
whitesourceGroup := Whitesource.Group.Community,
crossVersion := CrossVersion.binary,
crossScalaVersions := Dependencies.ScalaVersions,
scalaVersion := crossScalaVersions.value.head,
scalaVersion := Dependencies.Scala212,
scalacOptions ++= Seq(
"-encoding",
"UTF-8",
Expand All @@ -37,11 +37,14 @@ object Common extends AutoPlugin {
"-deprecation",
//"-Xfatal-warnings",
"-Xlint",
"-Yno-adapted-args",
"-Ywarn-dead-code",
"-Xfuture",
"-target:jvm-1.8"
),
scalacOptions ++= (scalaVersion.value match {
case Dependencies.Scala213 => Seq.empty[String]
case _ => Seq("-Yno-adapted-args")
}),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
"-doc-title",
"Alpakka",
Expand Down
Loading