diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml new file mode 100644 index 00000000..4c604811 --- /dev/null +++ b/.github/workflows/cd.yml @@ -0,0 +1,68 @@ +name: CD + +on: + push: + tags: + - '*' + +jobs: + cd: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: coursier/cache-action@v5 + + - name: Set up JDK 8 + uses: actions/setup-java@v1 + with: + java-version: 8 + + - name: Create GitHub release & Attach artifacts + uses: softprops/action-gh-release@v1 + with: + draft: true + prerelease: ${{ contains(steps.version.outputs.tag, 'M') }} + name: ${{ steps.version.outputs.tag }} + tag_name: ${{ steps.version.outputs.tag }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Publish common_2.12 to Sonatype + run: | + sbt "project common" clean ci-release + continue-on-error: true + env: + PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }} + PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }} + SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }} + SONATYPE_USERNAME: ${{ secrets.SONA_USER }} + + - name: Publish grpc_2.12 to Sonatype + run: | + sbt "project grpc" clean ci-release + continue-on-error: true + env: + PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }} + PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }} + SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }} + SONATYPE_USERNAME: ${{ secrets.SONA_USER }} + + - name: Publish common_2.13 to Sonatype + run: | + sbt "project common" ++2.13.5! clean ci-release + continue-on-error: true + env: + PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }} + PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }} + SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }} + SONATYPE_USERNAME: ${{ secrets.SONA_USER }} + + - name: Publish grpc_2.13 to Sonatype + run: | + sbt "project grpc" ++2.13.5! clean ci-release + continue-on-error: true + env: + PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }} + PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }} + SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }} + SONATYPE_USERNAME: ${{ secrets.SONA_USER }} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..b39bce3d --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,23 @@ +name: CI + +on: + push: + branches: + - master + pull_request: + +jobs: + ci: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - uses: coursier/cache-action@v5 + + - name: Set up JDK 8 + uses: actions/setup-java@v1 + with: + java-version: 8 + + - name: Run tests + run: sbt "project grpc" clean +test diff --git a/.github/workflows/snyk.yml b/.github/workflows/snyk.yml new file mode 100644 index 00000000..baab4e32 --- /dev/null +++ b/.github/workflows/snyk.yml @@ -0,0 +1,21 @@ +name: Snyk + +on: + push: + branches: [ master ] + +jobs: + snyk: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - uses: coursier/cache-action@v6 + + - name: Run Snyk to check for vulnerabilities + uses: snyk/actions/scala@master + with: + command: monitor + args: --project-name=fs2-google-pubsub + env: + SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} diff --git a/README.md b/README.md index 3032d831..08e04d50 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ [![Build Status](https://travis-ci.org/permutive/fs2-google-pubsub.svg?branch=master)](https://travis-ci.org/permutive/fs2-google-pubsub) [![Maven Central](https://img.shields.io/maven-central/v/com.permutive/fs2-google-pubsub_2.12.svg)](http://search.maven.org/#search%7Cga%7C1%7Cfs2-google-pubsub) +This repository is a fork. The original repository is at https://github.com/permutive/fs2-google-pubsub. + [Google Cloud Pub/Sub][0] stream-based client built on top of [cats-effect][1], [fs2][2] and [http4s][6]. `fs2-google-pubsub` provides a mix of APIs, depending on the exact module. Consumers are provided as `fs2` streams, diff --git a/build.sbt b/build.sbt index e1f0368c..c8332e55 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ def priorTo2_13(scalaVersion: String): Boolean = } lazy val commonSettings = Seq( - organization := "com.permutive", + organization := "com.snowplowanalytics", scalaVersion := Dependencies.Versions.scala212, crossScalaVersions := Seq(Dependencies.Versions.scala212, Dependencies.Versions.scala213), javacOptions in (Compile, compile) ++= Seq("-source", "1.8", "-target", "1.8"), @@ -56,7 +56,7 @@ lazy val commonSettings = Seq( "-opt-inline-from:com.permutive.**", "-opt-warnings", // Lint after expansion so that implicits used in macros are not flagged as unused - "-Ywarn-macros:after", + "-Ywarn-macros:after" ), scalacOptions ++= { if (priorTo2_13(scalaVersion.value)) @@ -70,11 +70,11 @@ lazy val commonSettings = Seq( "-Ywarn-nullary-unit", // Warn when nullary methods return Unit. "-Xlint:by-name-right-associative", // By-name parameter of right associative operator. "-Xlint:unsound-match", // Pattern match may not be typesafe. - "-Xfuture", // Turn on future language features. + "-Xfuture" // Turn on future language features. ) else Seq( - "-Ymacro-annotations", + "-Ymacro-annotations" ) } ) @@ -85,7 +85,7 @@ lazy val common = (project in file("fs2-google-pubsub")) commonSettings, publishSettings, libraryDependencies ++= Dependencies.commonDependencies, - libraryDependencies ++= Dependencies.testsDependencies, + libraryDependencies ++= Dependencies.testsDependencies ) lazy val http = (project in file("fs2-google-pubsub-http")) @@ -95,7 +95,7 @@ lazy val http = (project in file("fs2-google-pubsub-http")) commonSettings, publishSettings, libraryDependencies ++= Dependencies.httpDependencies, - libraryDependencies ++= Dependencies.testsDependencies, + libraryDependencies ++= Dependencies.testsDependencies ) lazy val grpc = (project in file("fs2-google-pubsub-grpc")) @@ -105,7 +105,7 @@ lazy val grpc = (project in file("fs2-google-pubsub-grpc")) commonSettings, publishSettings, libraryDependencies ++= Dependencies.grpcDependencies, - libraryDependencies ++= Dependencies.testsDependencies, + libraryDependencies ++= Dependencies.testsDependencies ) lazy val root = (project in file(".")) @@ -115,6 +115,7 @@ lazy val root = (project in file(".")) commonSettings, publishSettings, publish / skip := true, + crossScalaVersions := Nil ) .aggregate( common, @@ -122,35 +123,20 @@ lazy val root = (project in file(".")) grpc ) -lazy val publishSettings = Seq( - releaseCrossBuild := true, - releaseVcsSign := true, - releasePublishArtifactsAction := PgpKeys.publishSigned.value, - homepage := Some(url("https://github.com/permutive/fs2-google-pubsub")), - licenses := Seq("Apache 2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0")), - publishMavenStyle := true, - publishArtifact in Test := false, - pomIncludeRepository := { _ => - false - }, - publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots".at(nexus + "content/repositories/snapshots")) - else - Some("releases".at(nexus + "service/local/staging/deploy/maven2")) - }, - autoAPIMappings := true, - scmInfo := Some( - ScmInfo( - url("https://github.com/permutive/fs2-google-pubsub"), - "scm:git:git@github.com:permutive/fs2-google-pubsub.git" - ) - ), +lazy val publishSettings = Seq[Setting[_]]( + publishArtifact := true, + pomIncludeRepository := { _ => false }, + ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix + organization := "com.snowplowanalytics", + homepage := Some(url("https://snowplowanalytics.com")), + licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0")), + scmInfo := Some(ScmInfo(url("https://github.com/snowplow-incubator/fs2-google-pubsub"), "scm:git@github.com:snowplow-incubator/fs2-google-pubsub.git")), + Test / publishArtifact := false, developers := List( Developer("cremboc", "Paulius Imbrasas", "", url("https://github.com/cremboc")), Developer("TimWSpence", "Tim Spence", "", url("https://github.com/TimWSpence")), Developer("bastewart", "Ben Stewart", "", url("https://github.com/bastewart")), - Developer("travisbrown", "Travis Brown", "", url("https://twitter.com/travisbrown")) + Developer("travisbrown", "Travis Brown", "", url("https://twitter.com/travisbrown")), + Developer("Snowplow Analytics Ltd", "Snowplow Analytics Ltd", "support@snowplowanalytics.com", url("https://snowplowanalytics.com")) ) ) diff --git a/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala b/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala index 0587b01e..4238544a 100644 --- a/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala +++ b/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala @@ -4,7 +4,9 @@ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit} import cats.effect.{Blocker, ContextShift, Resource, Sync} import cats.syntax.all._ +import com.google.api.core.ApiService import com.google.api.gax.batching.FlowControlSettings +import com.google.common.util.concurrent.MoreExecutors import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber} import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage} import com.permutive.pubsub.consumer.{Model => PublicModel} @@ -20,13 +22,13 @@ private[consumer] object PubsubSubscriber { config: PubsubGoogleConsumerConfig[F] )(implicit F: Sync[F] - ): Resource[F, BlockingQueue[Model.Record[F]]] = - Resource[F, BlockingQueue[Model.Record[F]]] { + ): Resource[F, BlockingQueue[Either[Throwable, Model.Record[F]]]] = + Resource[F, BlockingQueue[Either[Throwable, Model.Record[F]]]] { Sync[F].delay { - val messages = new LinkedBlockingQueue[Model.Record[F]](config.maxQueueSize) + val messages = new LinkedBlockingQueue[Either[Throwable, Model.Record[F]]](config.maxQueueSize) val receiver = new MessageReceiver { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = - messages.put(Model.Record(message, Sync[F].delay(consumer.ack()), Sync[F].delay(consumer.nack()))) + messages.put(Right(Model.Record(message, Sync[F].delay(consumer.ack()), Sync[F].delay(consumer.nack())))) } val subscriptionName = ProjectSubscriptionName.of(projectId.value, subscription.value) @@ -49,6 +51,7 @@ private[consumer] object PubsubSubscriber { .map(f => f(builder)) .getOrElse(builder) .build() + sub.addListener(new PubsubErrorListener(messages), MoreExecutors.directExecutor) val service = sub.startAsync() val shutdown = @@ -60,6 +63,11 @@ private[consumer] object PubsubSubscriber { } } + class PubsubErrorListener[R](messages: BlockingQueue[Either[Throwable, R]]) extends ApiService.Listener { + override def failed(from: ApiService.State, failure: Throwable): Unit = + messages.put(Left(failure)) + } + def subscribe[F[_]: Sync: ContextShift]( blocker: Blocker, projectId: PublicModel.ProjectId, @@ -68,6 +76,7 @@ private[consumer] object PubsubSubscriber { ): Stream[F, Model.Record[F]] = for { queue <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config)) - msg <- Stream.repeatEval(blocker.delay(queue.take())) + next <- Stream.repeatEval(blocker.delay(queue.take())) + msg <- next.fold(Stream.raiseError(_), Stream.emit(_)) } yield msg } diff --git a/project/build.properties b/project/build.properties index 0837f7a1..10fd9eee 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.13 +sbt.version=1.5.5 diff --git a/project/plugins.sbt b/project/plugins.sbt index 77017d34..757a2892 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,3 +4,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.7.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1") addSbtPlugin("io.chrisdavenport" % "sbt-mima-version-check" % "0.1.2") +addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.7") diff --git a/version.sbt b/version.sbt deleted file mode 100644 index 48456ba3..00000000 --- a/version.sbt +++ /dev/null @@ -1 +0,0 @@ -version in ThisBuild := "0.17.0"