Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative implementation of a pubsub source - 2 #86

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install sbt
uses: sbt/setup-sbt@v1
- uses: actions/checkout@v2
- uses: coursier/cache-action@v6
- name: Set up JDK 11
Expand Down Expand Up @@ -72,6 +74,8 @@ jobs:
dockerSuffix: gcp
dockerTagSuffix: "-biglake"
steps:
- name: Install sbt
uses: sbt/setup-sbt@v1
- name: Checkout Github
uses: actions/checkout@v2
- uses: coursier/cache-action@v6
Expand Down
8 changes: 8 additions & 0 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
"input": ${snowplow.defaults.sources.pubsub}
"input": {
"gcpUserAgent": ${gcpUserAgent}

// V2 defaults
"durationPerAckExtension": "10 minutes"
"minRemainingDeadline": 0.1
"maxMessagesPerPull": 1000
"debounceRequests": "100 millis"
"prefetch": 4
"logMessageIds": false
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ import cats.implicits._

import com.google.api.client.googleapis.json.GoogleJsonResponseException

import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceConfig}
import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceAlternative}
import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubSourceV2
import com.snowplowanalytics.snowplow.sinks.pubsub.{PubsubSink, PubsubSinkConfig}

object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo) {
object GcpApp extends LoaderApp[PubsubSourceAlternative, PubsubSinkConfig](BuildInfo) {

override def source: SourceProvider = PubsubSource.build(_)
override def source: SourceProvider = {
case PubsubSourceAlternative.V1(c) => PubsubSource.build(c)
case PubsubSourceAlternative.V2(c) => PubsubSourceV2.build(c)
}

override def badSink: SinkProvider = PubsubSink.resource(_)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub

import io.circe.Decoder
import cats.implicits._

import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubSourceConfigV2

/**
* Allows experimental support for the V2 source, while loading the V1 source by default
*
* Users can select the v2 Source by setting `"version": "v2"` in the hocon file
*/
sealed trait PubsubSourceAlternative

object PubsubSourceAlternative {
case class V1(config: PubsubSourceConfig) extends PubsubSourceAlternative
case class V2(config: PubsubSourceConfigV2) extends PubsubSourceAlternative

implicit def decoder: Decoder[PubsubSourceAlternative] = Decoder.decodeJsonObject.flatMap {
case obj if obj("version").flatMap(_.asString) === Some("v2") =>
implicitly[Decoder[PubsubSourceConfigV2]].map(V2(_))
case _ =>
implicitly[Decoder[PubsubSourceConfig]].map(V1(_))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### common-streams-extensions

Code in this directory is destined to be migrated to common-streams.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.effect.Async
import cats.implicits._
import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures}
import com.google.common.util.concurrent.MoreExecutors

object FutureInterop {
def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[A] =
Async[F]
.async[A] { cb =>
val cancel = Async[F].delay {
fut.cancel(false)
}.void
Async[F].delay {
addCallback(fut, cb)
Some(cancel)
}
}

def fromFuture_[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] =
fromFuture(fut).void

private def addCallback[A](fut: ApiFuture[A], cb: Either[Throwable, A] => Unit): Unit = {
val apiFutureCallback = new ApiFutureCallback[A] {
def onFailure(t: Throwable): Unit = cb(Left(t))
def onSuccess(result: A): Unit = cb(Right(result))
}
ApiFutures.addCallback(fut, apiFutureCallback, MoreExecutors.directExecutor)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import java.time.Instant

/**
* Data held about a batch of messages pulled from a pubsub subscription
*
* @param currentDeadline
* The deadline before which we must either ack, nack, or extend the deadline to something further
* in the future. This is updated over time if we approach a deadline.
* @param ackIds
* The IDs which are needed to ack all messages in the batch
*/
private case class PubsubBatchState(
currentDeadline: Instant,
ackIds: Vector[String]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.implicits._
import cats.effect.kernel.Unique
import cats.effect.{Async, Deferred, Ref, Sync}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import com.google.pubsub.v1.AcknowledgeRequest
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.jdk.CollectionConverters._
import scala.concurrent.duration.Duration

import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import com.snowplowanalytics.snowplow.pubsub.FutureInterop
import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._

/**
* The Pubsub checkpointer
*
* @param subscription
* Pubsub subscription name
* @param deferredResources
* Resources needed so we can ack/nack messages. This is wrapped in `Deferred` because the
* resources are not available until the application calls `.stream` on the `LowLevelSource`. This
* is a limitation in the design of the common-streams Source interface.
*/
class PubsubCheckpointer[F[_]: Async](
subscription: PubsubSourceConfigV2.Subscription,
deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]]
) extends Checkpointer[F, Vector[Unique.Token]] {

import PubsubCheckpointer._

private implicit def logger: Logger[F] = Slf4jLogger.getLogger[F]

override def combine(x: Vector[Unique.Token], y: Vector[Unique.Token]): Vector[Unique.Token] =
x |+| y

override val empty: Vector[Unique.Token] = Vector.empty

/**
* Ack some batches of messages received from pubsub
*
* @param c
* tokens which are keys to batch data held in the shared state
*/
override def ack(c: Vector[Unique.Token]): F[Unit] =
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
_ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
val attempt = for {
apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request))
_ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture)
} yield ()
attempt.retryingOnTransientGrpcFailures
.recoveringOnGrpcInvalidArgument { s =>
// This can happen if ack IDs have expired before we acked
Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}")
}
}
} yield ()

/**
* Nack some batches of messages received from pubsub
*
* @param c
* tokens which are keys to batch data held in the shared state
*/
override def nack(c: Vector[Unique.Token]): F[Unit] =
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
ackIds = ackDatas.flatMap(_.ackIds)
// A nack is just a modack with zero duration
_ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero)
} yield ()
}

private object PubsubCheckpointer {

/**
* Resources needed by `PubsubCheckpointer` so it can ack/nack messages
*
* @param stub
* The GRPC stub needed to execute the ack/nack RPCs
* @param refState
* A map from tokens to the data held about a batch of messages received from pubsub. The map is
* wrapped in a `Ref` because it is concurrently modified by the source adding new batches to
* the state.
*/
case class Resources[F[_]](stub: SubscriberStub, refState: Ref[F, Map[Unique.Token, PubsubBatchState]])

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.implicits._
import cats.effect.Async
import com.google.api.gax.rpc.{ApiException, StatusCode}
import io.grpc.Status
import org.typelevel.log4cats.Logger
import retry.RetryPolicies
import retry.implicits._

import scala.concurrent.duration.DurationDouble

private object PubsubRetryOps {

object implicits {
implicit class Ops[F[_], A](val f: F[A]) extends AnyVal {

def retryingOnTransientGrpcFailures(implicit F: Async[F], L: Logger[F]): F[A] =
f.retryingOnSomeErrors(
isWorthRetrying = { e => isRetryableException(e).pure[F] },
policy = RetryPolicies.fullJitter(1.second),
onError = { case (t, _) =>
Logger[F].info(t)(s"Pubsub retryable GRPC error will be retried: ${t.getMessage}")
}
)

def recoveringOnGrpcInvalidArgument(f2: Status => F[A])(implicit F: Async[F]): F[A] =
f.recoverWith {
case StatusFromThrowable(s) if s.getCode.equals(Status.Code.INVALID_ARGUMENT) =>
f2(s)
}
}
}

private object StatusFromThrowable {
def unapply(t: Throwable): Option[Status] =
Some(Status.fromThrowable(t))
}

def isRetryableException: Throwable => Boolean = {
case apiException: ApiException =>
apiException.getStatusCode.getCode match {
case StatusCode.Code.DEADLINE_EXCEEDED => true
case StatusCode.Code.INTERNAL => true
case StatusCode.Code.CANCELLED => true
case StatusCode.Code.RESOURCE_EXHAUSTED => true
case StatusCode.Code.ABORTED => true
case StatusCode.Code.UNKNOWN => true
case StatusCode.Code.UNAVAILABLE => !apiException.getMessage().contains("Server shutdownNow invoked")
case _ => false
}
case _ =>
false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.Show
import io.circe.Decoder
import io.circe.generic.semiauto._
import io.circe.config.syntax._
import com.google.pubsub.v1.ProjectSubscriptionName

import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

case class PubsubSourceConfigV2(
subscription: PubsubSourceConfigV2.Subscription,
parallelPullFactor: BigDecimal,
durationPerAckExtension: FiniteDuration,
minRemainingDeadline: Double,
gcpUserAgent: GcpUserAgent,
maxMessagesPerPull: Int,
debounceRequests: FiniteDuration,
prefetch: Int,
logMessageIds: PubsubSourceConfigV2.CustomBoolean
)

object PubsubSourceConfigV2 {

case class Subscription(projectId: String, subscriptionId: String)

case class CustomBoolean(value: Boolean) extends AnyVal

object CustomBoolean {
implicit def decoder: Decoder[CustomBoolean] =
Decoder.decodeBoolean
.or(Decoder.decodeString.emapTry(s => scala.util.Try(s.toBoolean)))
.map(CustomBoolean(_))

}

object Subscription {
implicit def show: Show[Subscription] = Show[Subscription] { s =>
ProjectSubscriptionName.of(s.projectId, s.subscriptionId).toString
}
}

private implicit def subscriptionDecoder: Decoder[Subscription] =
Decoder.decodeString
.map(_.split("/"))
.emap {
case Array("projects", projectId, "subscriptions", subscriptionId) =>
Right(Subscription(projectId, subscriptionId))
case _ =>
Left("Expected format: projects/<project>/subscriptions/<subscription>")
}

implicit def decoder: Decoder[PubsubSourceConfigV2] = deriveDecoder[PubsubSourceConfigV2]
}
Loading
Loading