Skip to content

Commit

Permalink
Build with Scala 2.13 (#1530)
Browse files Browse the repository at this point in the history
This enables cross building for Scala 2.13.0-M5. Connectors that have either missing dependencies or are problematic to cross-build, have cross-publishing disabled.
  • Loading branch information
2m authored Feb 27, 2019
1 parent f6b6cb8 commit a0a5c69
Show file tree
Hide file tree
Showing 20 changed files with 123 additions and 84 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
name: "Build code style check (fixed with `sbt scalafmtSbt`)"
- env: CMD="++2.11.12 Test/compile"
name: "Compile all tests (with Scala 2.11)"
- env: CMD="++2.13.0-M5 Test/compile"
name: "Compile all tests (with Scala 2.13)"
- env: CMD="unidoc"
name: "Create all API docs"
- env: CMD="docs/Paradox/paradox"
Expand Down
64 changes: 46 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ lazy val alpakka = project
(ScalaUnidoc / unidoc / fullClasspath).value
.filterNot(_.data.getAbsolutePath.contains("protobuf-java-2.5.0.jar"))
},
ScalaUnidoc / unidoc / unidocProjectFilter := inAnyProject -- inProjects(`doc-examples`)
ScalaUnidoc / unidoc / unidocProjectFilter := inAnyProject -- inProjects(`doc-examples`),
crossScalaVersions := List() // workaround for https://github.com/sbt/sbt/issues/3465
)

lazy val amqp = alpakkaProject("amqp", "amqp", Dependencies.Amqp)
Expand All @@ -93,7 +94,7 @@ lazy val cassandra = alpakkaProject("cassandra", "cassandra", Dependencies.Cassa
lazy val couchbase =
alpakkaProject("couchbase", "couchbase", Dependencies.Couchbase, parallelExecution in Test := false)

lazy val csv = alpakkaProject("csv", "csv", Dependencies.Csv)
lazy val csv = alpakkaProject("csv", "csv", Dependencies.Csv, crossScalaVersions -= Dependencies.Scala213)

lazy val dynamodb = alpakkaProject("dynamodb", "aws.dynamodb", Dependencies.DynamoDB)

Expand All @@ -115,11 +116,25 @@ lazy val ftp = alpakkaProject(
parallelExecution in Test := false,
fork in Test := true,
// To avoid potential blocking in machines with low entropy (default is `/dev/random`)
javaOptions in Test += "-Djava.security.egd=file:/dev/./urandom"
javaOptions in Test += "-Djava.security.egd=file:/dev/./urandom",
crossScalaVersions -= Dependencies.Scala213
)

lazy val geode =
alpakkaProject("geode", "geode", Dependencies.Geode, fork in Test := true, parallelExecution in Test := false)
alpakkaProject(
"geode",
"geode",
Dependencies.Geode,
fork in Test := true,
parallelExecution in Test := false,
unmanagedSourceDirectories in Compile ++= {
val sourceDir = (sourceDirectory in Compile).value
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n >= 12 => Seq(sourceDir / "scala-2.12+")
case _ => Seq.empty
}
},
)

lazy val googleCloudPubSub = alpakkaProject(
"google-cloud-pub-sub",
Expand All @@ -141,7 +156,8 @@ lazy val googleCloudPubSubGrpc = alpakkaProject(
javaAgents += Dependencies.GooglePubSubGrpcAlpnAgent % "test",
// for the ExampleApp in the tests
connectInput in run := true,
Compile / compile / scalacOptions += "-P:silencer:pathFilters=src_managed"
Compile / compile / scalacOptions += "-P:silencer:pathFilters=src_managed",
crossScalaVersions -= Dependencies.Scala213
).enablePlugins(AkkaGrpcPlugin, JavaAgent)

lazy val googleFcm = alpakkaProject(
Expand All @@ -155,7 +171,11 @@ lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, fork in Te

lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs, parallelExecution in Test := false)

lazy val ironmq = alpakkaProject("ironmq", "ironmq", Dependencies.IronMq, fork in Test := true)
lazy val ironmq = alpakkaProject("ironmq",
"ironmq",
Dependencies.IronMq,
fork in Test := true,
crossScalaVersions -= Dependencies.Scala213)

lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, parallelExecution in Test := false)

Expand All @@ -169,7 +189,8 @@ lazy val kinesis = alpakkaProject("kinesis",

lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu, fork in Test := false)

lazy val mongodb = alpakkaProject("mongodb", "mongodb", Dependencies.MongoDb)
lazy val mongodb =
alpakkaProject("mongodb", "mongodb", Dependencies.MongoDb, crossScalaVersions -= Dependencies.Scala213)

lazy val mqtt = alpakkaProject("mqtt", "mqtt", Dependencies.Mqtt)

Expand All @@ -193,21 +214,27 @@ lazy val springWeb = alpakkaProject("spring-web", "spring.web", Dependencies.Spr

lazy val simpleCodecs = alpakkaProject("simple-codecs", "simplecodecs")

lazy val slick = alpakkaProject("slick", "slick", Dependencies.Slick)
lazy val slick = alpakkaProject("slick", "slick", Dependencies.Slick, crossScalaVersions -= Dependencies.Scala213)

lazy val sns = alpakkaProject("sns",
"aws.sns",
Dependencies.Sns,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false)
lazy val sns = alpakkaProject(
"sns",
"aws.sns",
Dependencies.Sns,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false,
crossScalaVersions -= Dependencies.Scala213
)

lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr, parallelExecution in Test := false)

lazy val sqs = alpakkaProject("sqs",
"aws.sqs",
Dependencies.Sqs,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false)
lazy val sqs = alpakkaProject(
"sqs",
"aws.sqs",
Dependencies.Sqs,
// For mockito https://github.com/akka/alpakka/issues/390
parallelExecution in Test := false,
crossScalaVersions -= Dependencies.Scala213
)

lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse)

Expand Down Expand Up @@ -297,6 +324,7 @@ lazy val `doc-examples` = project
name := s"akka-stream-alpakka-doc-examples",
publish / skip := true,
whitesourceIgnore := true,
crossScalaVersions -= Dependencies.Scala213,
Dependencies.`Doc-examples`
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class ItemSpec extends TestKit(ActorSystem("ItemSpec")) with AsyncWordSpecLike w
"9) get two items in a transaction" ignore assertAllStagesStopped {
DynamoDb.single(transactGetItemsRequest).map { results =>
results.getResponses.size shouldBe 2
val Seq(a, b) = results.getResponses.asScala
a.getItem.get(sortCol) shouldEqual N(0)
b.getItem.get(sortCol) shouldEqual N(1)
val responses = results.getResponses.asScala
responses.head.getItem.get(sortCol) shouldEqual N(0)
responses.last.getItem.get(sortCol) shouldEqual N(1)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.geode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final class HTableSettings[T] private (val conf: Configuration,
* Java Api
*/
def withColumnFamilies(columnFamilies: java.util.List[String]): HTableSettings[T] =
copy(columnFamilies = columnFamilies.asScala.to[immutable.Seq])
copy(columnFamilies = columnFamilies.asScala.toIndexedSeq)

def withConverter(converter: T => immutable.Seq[Mutation]): HTableSettings[T] =
copy(converter = converter)
Expand All @@ -39,7 +39,7 @@ final class HTableSettings[T] private (val conf: Configuration,
* Java Api
*/
def withConverter(converter: java.util.function.Function[T, java.util.List[Mutation]]): HTableSettings[T] =
copy(converter = converter.asScala(_).asScala.to[immutable.Seq])
copy(converter = converter.asScala(_).asScala.toIndexedSeq)

override def toString: String =
"HTableSettings(" +
Expand Down Expand Up @@ -76,8 +76,5 @@ object HTableSettings {
tableName: TableName,
columnFamilies: java.util.List[String],
converter: java.util.function.Function[T, java.util.List[Mutation]]): HTableSettings[T] =
HTableSettings(conf,
tableName,
columnFamilies.asScala.to[immutable.Seq],
converter.asScala(_).asScala.to[immutable.Seq])
HTableSettings(conf, tableName, columnFamilies.asScala.toIndexedSeq, converter.asScala(_).asScala.toIndexedSeq)
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ object JavaTestUtils extends TestUtils {
assertEquals(getFiles(fs).size(), logs.size())

def readLogs(fs: FileSystem, logs: Sequence[RotationMessage]): Sequence[String] =
ScalaTestUtils.readLogs(fs, logs.asScala).asJava
ScalaTestUtils.readLogs(fs, logs.asScala.toIndexedSeq).asJava

def readLogsWithFlatten(fs: FileSystem, logs: Sequence[RotationMessage]): Sequence[Char] =
ScalaTestUtils.readLogsWithFlatten(fs, logs.asScala).asJava
ScalaTestUtils.readLogsWithFlatten(fs, logs.asScala.toIndexedSeq).asJava

def generateFakeContentWithPartitions(count: Double, bytes: Long, partition: Int): Sequence[ByteString] =
ScalaTestUtils.generateFakeContentWithPartitions(count, bytes, partition).asJava
Expand All @@ -209,8 +209,9 @@ object JavaTestUtils extends TestUtils {
logs: Sequence[RotationMessage],
codec: CompressionCodec): Assertion = {
val pureContent: String = content.asScala.map(_.utf8String).mkString
val contentFromHdfsWithCodec: String = ScalaTestUtils.readLogsWithCodec(fs, logs.asScala, codec).mkString
val contentFromHdfs: String = ScalaTestUtils.readLogs(fs, logs.asScala).mkString
val contentFromHdfsWithCodec: String =
ScalaTestUtils.readLogsWithCodec(fs, logs.asScala.toIndexedSeq, codec).mkString
val contentFromHdfs: String = ScalaTestUtils.readLogs(fs, logs.asScala.toIndexedSeq).mkString
assertNotEquals(contentFromHdfs, pureContent)
assertEquals(contentFromHdfsWithCodec, pureContent)
}
Expand All @@ -225,7 +226,7 @@ object JavaTestUtils extends TestUtils {
ScalaTestUtils.generateFakeContentForSequence(count, bytes).map { case (k, v) => akka.japi.Pair(k, v) }.asJava

def readLogsWithCodec(fs: FileSystem, logs: Sequence[RotationMessage], codec: CompressionCodec): Sequence[String] =
ScalaTestUtils.readLogsWithCodec(fs, logs.asScala, codec).asJava
ScalaTestUtils.readLogsWithCodec(fs, logs.asScala.toIndexedSeq, codec).asJava

def verifyFlattenContent(fs: FileSystem, logs: Sequence[RotationMessage], content: Sequence[ByteString]): Unit =
assertArrayEquals(readLogsWithFlatten(fs, logs).toArray, content.asScala.flatMap(_.utf8String).asJava.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ private final class SoftReferenceCache[K, V <: AnyRef] {
}

private def purgeCache(): Unit =
cache --= cache.collect { case (key, ref) if ref.get.isEmpty => key }.to[Vector]
cache --= cache.collect { case (key, ref) if ref.get.isEmpty => key }.toVector
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import javax.jms.{JMSException, TextMessage}
import org.scalatest.Inspectors._

import scala.annotation.tailrec
import scala.collection.{mutable, SortedSet}
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -280,7 +280,7 @@ class JmsAckConnectorsSpec extends JmsSpec {

killSwitch2.shutdown()

resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"ensure no message loss when aborting a stream" in withServer() { server =>
Expand Down Expand Up @@ -360,7 +360,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
implicit val _ = Ordering.by { s: String =>
s.toInt
}
resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"shutdown when waiting to acknowledge messages" in withServer() { server =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import javax.jms.{JMSException, TextMessage}
import org.scalatest.Inspectors._

import scala.annotation.tailrec
import scala.collection.{immutable, mutable, SortedSet}
import scala.collection.{immutable, mutable}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -287,7 +287,7 @@ class JmsBufferedAckConnectorsSpec extends JmsSpec {
implicit val _ = Ordering.by { s: String =>
s.toInt
}
resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"ensure no message loss when aborting a stream" in withConnectionFactory() { connectionFactory =>
Expand Down Expand Up @@ -372,7 +372,7 @@ class JmsBufferedAckConnectorsSpec extends JmsSpec {
implicit val _ = Ordering.by { s: String =>
s.toInt
}
resultList.to[SortedSet] should contain theSameElementsAs numsIn.map(_.toString)
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ private[kinesisfirehose] object KinesisFirehoseFlowStage {
result.getRequestResponses.asScala
.zip(request.getRecords.asScala)
.filter(_._1.getErrorCode != null)
.toIndexedSeq
)
} else {
retryRecordsCallback(Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync
import com.amazonaws.services.kinesisfirehose.model.{PutRecordBatchResponseEntry, Record}

import scala.collection.JavaConverters._
import scala.collection.immutable.{Iterable, Queue}
import scala.collection.immutable.Queue
import scala.concurrent.duration._

object KinesisFirehoseFlow {
Expand All @@ -33,7 +33,7 @@ object KinesisFirehoseFlow {
)
)
.mapAsync(settings.parallelism)(identity)
.mapConcat(_.getRequestResponses.asScala.to[Iterable])
.mapConcat(_.getRequestResponses.asScala.toIndexedSeq)
.filter(_.getErrorCode == null)

private def getByteSize(record: Record): Int = record.getData.position
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ final case class Subscribe @InternalApi private[streaming] (packetId: PacketId,
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*/
def this(topicFilters: java.util.List[AkkaPair[String, Integer]]) =
this(PacketId(0), topicFilters.asScala.map(v => v.first -> ControlPacketFlags(v.second)))
this(PacketId(0), topicFilters.asScala.toIndexedSeq.map(v => v.first -> ControlPacketFlags(v.second)))

/**
* A convenience for subscribing to a single topic with at-least-once semantics
Expand All @@ -388,7 +388,7 @@ final case class SubAck(packetId: PacketId, returnCodes: Seq[ControlPacketFlags]
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*/
def this(packetId: PacketId, returnCodes: java.util.List[Integer]) =
this(packetId, returnCodes.asScala.map(v => ControlPacketFlags(v)))
this(packetId, returnCodes.asScala.toIndexedSeq.map(v => ControlPacketFlags(v)))
}

object Unsubscribe {
Expand Down Expand Up @@ -428,7 +428,7 @@ final case class Unsubscribe @InternalApi private[streaming] (packetId: PacketId
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*/
def this(topicFilters: java.util.List[String]) =
this(PacketId(0), topicFilters.asScala)
this(PacketId(0), topicFilters.asScala.toIndexedSeq)

/**
* A convenience for unsubscribing from a single topic
Expand Down
7 changes: 5 additions & 2 deletions project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object Common extends AutoPlugin {
whitesourceGroup := Whitesource.Group.Community,
crossVersion := CrossVersion.binary,
crossScalaVersions := Dependencies.ScalaVersions,
scalaVersion := crossScalaVersions.value.head,
scalaVersion := Dependencies.Scala212,
scalacOptions ++= Seq(
"-encoding",
"UTF-8",
Expand All @@ -37,11 +37,14 @@ object Common extends AutoPlugin {
"-deprecation",
//"-Xfatal-warnings",
"-Xlint",
"-Yno-adapted-args",
"-Ywarn-dead-code",
"-Xfuture",
"-target:jvm-1.8"
),
scalacOptions ++= (scalaVersion.value match {
case Dependencies.Scala213 => Seq.empty[String]
case _ => Seq("-Yno-adapted-args")
}),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
"-doc-title",
"Alpakka",
Expand Down
Loading

0 comments on commit a0a5c69

Please sign in to comment.