Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Release/0.18.0 #1

Open
wants to merge 3 commits into
base: release/0.17.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
name: CD

on:
push:
tags:
- '*'

jobs:
cd:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v5

- name: Set up JDK 8
uses: actions/setup-java@v1
with:
java-version: 8

- name: Create GitHub release & Attach artifacts
uses: softprops/action-gh-release@v1
with:
draft: true
prerelease: ${{ contains(steps.version.outputs.tag, 'M') }}
name: ${{ steps.version.outputs.tag }}
tag_name: ${{ steps.version.outputs.tag }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Publish common_2.12 to Sonatype
run: |
sbt "project common" clean ci-release
continue-on-error: true
env:
PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }}
SONATYPE_USERNAME: ${{ secrets.SONA_USER }}

- name: Publish grpc_2.12 to Sonatype
run: |
sbt "project grpc" clean ci-release
continue-on-error: true
env:
PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }}
SONATYPE_USERNAME: ${{ secrets.SONA_USER }}

- name: Publish common_2.13 to Sonatype
run: |
sbt "project common" ++2.13.5! clean ci-release
continue-on-error: true
env:
PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }}
SONATYPE_USERNAME: ${{ secrets.SONA_USER }}

- name: Publish grpc_2.13 to Sonatype
run: |
sbt "project grpc" ++2.13.5! clean ci-release
continue-on-error: true
env:
PGP_PASSPHRASE: ${{ secrets.SONA_PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }}
SONATYPE_USERNAME: ${{ secrets.SONA_USER }}
23 changes: 23 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: CI

on:
push:
branches:
- master
pull_request:

jobs:
ci:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v5

- name: Set up JDK 8
uses: actions/setup-java@v1
with:
java-version: 8

- name: Run tests
run: sbt "project grpc" clean +test
21 changes: 21 additions & 0 deletions .github/workflows/snyk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Snyk

on:
push:
branches: [ master ]

jobs:
snyk:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v6

- name: Run Snyk to check for vulnerabilities
uses: snyk/actions/scala@master
with:
command: monitor
args: --project-name=fs2-google-pubsub
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
[![Build Status](https://travis-ci.org/permutive/fs2-google-pubsub.svg?branch=master)](https://travis-ci.org/permutive/fs2-google-pubsub)
[![Maven Central](https://img.shields.io/maven-central/v/com.permutive/fs2-google-pubsub_2.12.svg)](http://search.maven.org/#search%7Cga%7C1%7Cfs2-google-pubsub)

This repository is a fork. The original repository is at https://github.com/permutive/fs2-google-pubsub.

[Google Cloud Pub/Sub][0] stream-based client built on top of [cats-effect][1], [fs2][2] and [http4s][6].

`fs2-google-pubsub` provides a mix of APIs, depending on the exact module. Consumers are provided as `fs2` streams,
Expand Down
52 changes: 19 additions & 33 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def priorTo2_13(scalaVersion: String): Boolean =
}

lazy val commonSettings = Seq(
organization := "com.permutive",
organization := "com.snowplowanalytics",
scalaVersion := Dependencies.Versions.scala212,
crossScalaVersions := Seq(Dependencies.Versions.scala212, Dependencies.Versions.scala213),
javacOptions in (Compile, compile) ++= Seq("-source", "1.8", "-target", "1.8"),
Expand Down Expand Up @@ -56,7 +56,7 @@ lazy val commonSettings = Seq(
"-opt-inline-from:com.permutive.**",
"-opt-warnings",
// Lint after expansion so that implicits used in macros are not flagged as unused
"-Ywarn-macros:after",
"-Ywarn-macros:after"
),
scalacOptions ++= {
if (priorTo2_13(scalaVersion.value))
Expand All @@ -70,11 +70,11 @@ lazy val commonSettings = Seq(
"-Ywarn-nullary-unit", // Warn when nullary methods return Unit.
"-Xlint:by-name-right-associative", // By-name parameter of right associative operator.
"-Xlint:unsound-match", // Pattern match may not be typesafe.
"-Xfuture", // Turn on future language features.
"-Xfuture" // Turn on future language features.
)
else
Seq(
"-Ymacro-annotations",
"-Ymacro-annotations"
)
}
)
Expand All @@ -85,7 +85,7 @@ lazy val common = (project in file("fs2-google-pubsub"))
commonSettings,
publishSettings,
libraryDependencies ++= Dependencies.commonDependencies,
libraryDependencies ++= Dependencies.testsDependencies,
libraryDependencies ++= Dependencies.testsDependencies
)

lazy val http = (project in file("fs2-google-pubsub-http"))
Expand All @@ -95,7 +95,7 @@ lazy val http = (project in file("fs2-google-pubsub-http"))
commonSettings,
publishSettings,
libraryDependencies ++= Dependencies.httpDependencies,
libraryDependencies ++= Dependencies.testsDependencies,
libraryDependencies ++= Dependencies.testsDependencies
)

lazy val grpc = (project in file("fs2-google-pubsub-grpc"))
Expand All @@ -105,7 +105,7 @@ lazy val grpc = (project in file("fs2-google-pubsub-grpc"))
commonSettings,
publishSettings,
libraryDependencies ++= Dependencies.grpcDependencies,
libraryDependencies ++= Dependencies.testsDependencies,
libraryDependencies ++= Dependencies.testsDependencies
)

lazy val root = (project in file("."))
Expand All @@ -115,42 +115,28 @@ lazy val root = (project in file("."))
commonSettings,
publishSettings,
publish / skip := true,
crossScalaVersions := Nil
)
.aggregate(
common,
http,
grpc
)

lazy val publishSettings = Seq(
releaseCrossBuild := true,
releaseVcsSign := true,
releasePublishArtifactsAction := PgpKeys.publishSigned.value,
homepage := Some(url("https://github.com/permutive/fs2-google-pubsub")),
licenses := Seq("Apache 2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0")),
publishMavenStyle := true,
publishArtifact in Test := false,
pomIncludeRepository := { _ =>
false
},
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots".at(nexus + "content/repositories/snapshots"))
else
Some("releases".at(nexus + "service/local/staging/deploy/maven2"))
},
autoAPIMappings := true,
scmInfo := Some(
ScmInfo(
url("https://github.com/permutive/fs2-google-pubsub"),
"scm:git:[email protected]:permutive/fs2-google-pubsub.git"
)
),
lazy val publishSettings = Seq[Setting[_]](
publishArtifact := true,
pomIncludeRepository := { _ => false },
ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix
organization := "com.snowplowanalytics",
homepage := Some(url("https://snowplowanalytics.com")),
licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0")),
scmInfo := Some(ScmInfo(url("https://github.com/snowplow-incubator/fs2-google-pubsub"), "scm:[email protected]:snowplow-incubator/fs2-google-pubsub.git")),
Test / publishArtifact := false,
developers := List(
Developer("cremboc", "Paulius Imbrasas", "", url("https://github.com/cremboc")),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we cannot remove that? It looks like we re-attribute the whole code to us?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think we can add ourselves to the list, but we shouldn't remove the existing developers.

It's probably also worth mentioning in the README that this is a fork and pointing to the original repository for anyone who finds this (and I'd make it quite prominent), that just feels like the right thing to do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks both, I’ll add us instead of replacing and update README as well

Developer("TimWSpence", "Tim Spence", "", url("https://github.com/TimWSpence")),
Developer("bastewart", "Ben Stewart", "", url("https://github.com/bastewart")),
Developer("travisbrown", "Travis Brown", "", url("https://twitter.com/travisbrown"))
Developer("travisbrown", "Travis Brown", "", url("https://twitter.com/travisbrown")),
Developer("Snowplow Analytics Ltd", "Snowplow Analytics Ltd", "[email protected]", url("https://snowplowanalytics.com"))
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}

import cats.effect.{Blocker, ContextShift, Resource, Sync}
import cats.syntax.all._
import com.google.api.core.ApiService
import com.google.api.gax.batching.FlowControlSettings
import com.google.common.util.concurrent.MoreExecutors
import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber}
import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import com.permutive.pubsub.consumer.{Model => PublicModel}
Expand All @@ -20,13 +22,13 @@ private[consumer] object PubsubSubscriber {
config: PubsubGoogleConsumerConfig[F]
)(implicit
F: Sync[F]
): Resource[F, BlockingQueue[Model.Record[F]]] =
Resource[F, BlockingQueue[Model.Record[F]]] {
): Resource[F, BlockingQueue[Either[Throwable, Model.Record[F]]]] =
Resource[F, BlockingQueue[Either[Throwable, Model.Record[F]]]] {
Sync[F].delay {
val messages = new LinkedBlockingQueue[Model.Record[F]](config.maxQueueSize)
val messages = new LinkedBlockingQueue[Either[Throwable, Model.Record[F]]](config.maxQueueSize)
val receiver = new MessageReceiver {
override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit =
messages.put(Model.Record(message, Sync[F].delay(consumer.ack()), Sync[F].delay(consumer.nack())))
messages.put(Right(Model.Record(message, Sync[F].delay(consumer.ack()), Sync[F].delay(consumer.nack()))))
}
val subscriptionName = ProjectSubscriptionName.of(projectId.value, subscription.value)

Expand All @@ -49,6 +51,7 @@ private[consumer] object PubsubSubscriber {
.map(f => f(builder))
.getOrElse(builder)
.build()
sub.addListener(new PubsubErrorListener(messages), MoreExecutors.directExecutor)

val service = sub.startAsync()
val shutdown =
Expand All @@ -60,6 +63,11 @@ private[consumer] object PubsubSubscriber {
}
}

class PubsubErrorListener[R](messages: BlockingQueue[Either[Throwable, R]]) extends ApiService.Listener {
override def failed(from: ApiService.State, failure: Throwable): Unit =
messages.put(Left(failure))
}

def subscribe[F[_]: Sync: ContextShift](
blocker: Blocker,
projectId: PublicModel.ProjectId,
Expand All @@ -68,6 +76,7 @@ private[consumer] object PubsubSubscriber {
): Stream[F, Model.Record[F]] =
for {
queue <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config))
msg <- Stream.repeatEval(blocker.delay(queue.take()))
next <- Stream.repeatEval(blocker.delay(queue.take()))
msg <- next.fold(Stream.raiseError(_), Stream.emit(_))
} yield msg
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.13
sbt.version=1.5.5
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.7.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("io.chrisdavenport" % "sbt-mima-version-check" % "0.1.2")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.7")
1 change: 0 additions & 1 deletion version.sbt

This file was deleted.