diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index df38fec..05a3cc5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,8 @@ jobs: test: runs-on: ubuntu-latest steps: + - name: Install sbt + uses: sbt/setup-sbt@v1 - uses: actions/checkout@v2 - uses: coursier/cache-action@v6 - name: Set up JDK 11 @@ -72,6 +74,8 @@ jobs: dockerSuffix: gcp dockerTagSuffix: "-biglake" steps: + - name: Install sbt + uses: sbt/setup-sbt@v1 - name: Checkout Github uses: actions/checkout@v2 - uses: coursier/cache-action@v6 diff --git a/modules/gcp/src/main/resources/application.conf b/modules/gcp/src/main/resources/application.conf index b33463c..2384b8a 100644 --- a/modules/gcp/src/main/resources/application.conf +++ b/modules/gcp/src/main/resources/application.conf @@ -10,6 +10,14 @@ "input": ${snowplow.defaults.sources.pubsub} "input": { "gcpUserAgent": ${gcpUserAgent} + + // V2 defaults + "durationPerAckExtension": "10 minutes" + "minRemainingDeadline": 0.1 + "maxMessagesPerPull": 1000 + "debounceRequests": "100 millis" + "prefetch": 4 + "logMessageIds": false } "output": { "bad": ${snowplow.defaults.sinks.pubsub} diff --git a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala index 19ded7a..e3d40df 100644 --- a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala +++ b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala @@ -14,12 +14,16 @@ import cats.implicits._ import com.google.api.client.googleapis.json.GoogleJsonResponseException -import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceConfig} +import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceAlternative} +import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubSourceV2 import com.snowplowanalytics.snowplow.sinks.pubsub.{PubsubSink, PubsubSinkConfig} -object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo) { +object GcpApp extends LoaderApp[PubsubSourceAlternative, PubsubSinkConfig](BuildInfo) { - override def source: SourceProvider = PubsubSource.build(_) + override def source: SourceProvider = { + case PubsubSourceAlternative.V1(c) => PubsubSource.build(c) + case PubsubSourceAlternative.V2(c) => PubsubSourceV2.build(c) + } override def badSink: SinkProvider = PubsubSink.resource(_) diff --git a/modules/gcp/src/main/scala/common-streams-extensions/PubsubSourceAlternative.scala b/modules/gcp/src/main/scala/common-streams-extensions/PubsubSourceAlternative.scala new file mode 100644 index 0000000..d6545f5 --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/PubsubSourceAlternative.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import io.circe.Decoder +import cats.implicits._ + +import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubSourceConfigV2 + +/** + * Allows experimental support for the V2 source, while loading the V1 source by default + * + * Users can select the v2 Source by setting `"version": "v2"` in the hocon file + */ +sealed trait PubsubSourceAlternative + +object PubsubSourceAlternative { + case class V1(config: PubsubSourceConfig) extends PubsubSourceAlternative + case class V2(config: PubsubSourceConfigV2) extends PubsubSourceAlternative + + implicit def decoder: Decoder[PubsubSourceAlternative] = Decoder.decodeJsonObject.flatMap { + case obj if obj("version").flatMap(_.asString) === Some("v2") => + implicitly[Decoder[PubsubSourceConfigV2]].map(V2(_)) + case _ => + implicitly[Decoder[PubsubSourceConfig]].map(V1(_)) + } +} diff --git a/modules/gcp/src/main/scala/common-streams-extensions/README.md b/modules/gcp/src/main/scala/common-streams-extensions/README.md new file mode 100644 index 0000000..d42f719 --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/README.md @@ -0,0 +1,3 @@ +### common-streams-extensions + +Code in this directory is destined to be migrated to common-streams. diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/FutureInterop.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/FutureInterop.scala new file mode 100644 index 0000000..39f6079 --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/FutureInterop.scala @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub.v2 + +import cats.effect.Async +import cats.implicits._ +import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures} +import com.google.common.util.concurrent.MoreExecutors + +object FutureInterop { + def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[A] = + Async[F] + .async[A] { cb => + val cancel = Async[F].delay { + fut.cancel(false) + }.void + Async[F].delay { + addCallback(fut, cb) + Some(cancel) + } + } + + def fromFuture_[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] = + fromFuture(fut).void + + private def addCallback[A](fut: ApiFuture[A], cb: Either[Throwable, A] => Unit): Unit = { + val apiFutureCallback = new ApiFutureCallback[A] { + def onFailure(t: Throwable): Unit = cb(Left(t)) + def onSuccess(result: A): Unit = cb(Right(result)) + } + ApiFutures.addCallback(fut, apiFutureCallback, MoreExecutors.directExecutor) + } + +} diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala new file mode 100644 index 0000000..376cadd --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub.v2 + +import java.time.Instant + +/** + * Data held about a batch of messages pulled from a pubsub subscription + * + * @param currentDeadline + * The deadline before which we must either ack, nack, or extend the deadline to something further + * in the future. This is updated over time if we approach a deadline. + * @param ackIds + * The IDs which are needed to ack all messages in the batch + */ +private case class PubsubBatchState( + currentDeadline: Instant, + ackIds: Vector[String] +) diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala new file mode 100644 index 0000000..28a851d --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub.v2 + +import cats.implicits._ +import cats.effect.kernel.Unique +import cats.effect.{Async, Deferred, Ref, Sync} +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.AcknowledgeRequest +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.jdk.CollectionConverters._ +import scala.concurrent.duration.Duration + +import com.snowplowanalytics.snowplow.sources.internal.Checkpointer +import com.snowplowanalytics.snowplow.pubsub.FutureInterop +import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._ + +/** + * The Pubsub checkpointer + * + * @param subscription + * Pubsub subscription name + * @param deferredResources + * Resources needed so we can ack/nack messages. This is wrapped in `Deferred` because the + * resources are not available until the application calls `.stream` on the `LowLevelSource`. This + * is a limitation in the design of the common-streams Source interface. + */ +class PubsubCheckpointer[F[_]: Async]( + subscription: PubsubSourceConfigV2.Subscription, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] +) extends Checkpointer[F, Vector[Unique.Token]] { + + import PubsubCheckpointer._ + + private implicit def logger: Logger[F] = Slf4jLogger.getLogger[F] + + override def combine(x: Vector[Unique.Token], y: Vector[Unique.Token]): Vector[Unique.Token] = + x |+| y + + override val empty: Vector[Unique.Token] = Vector.empty + + /** + * Ack some batches of messages received from pubsub + * + * @param c + * tokens which are keys to batch data held in the shared state + */ + override def ack(c: Vector[Unique.Token]): F[Unit] = + for { + Resources(stub, refAckIds) <- deferredResources.get + ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get))) + _ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds => + val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build + val attempt = for { + apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request)) + _ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture) + } yield () + attempt.retryingOnTransientGrpcFailures + .recoveringOnGrpcInvalidArgument { s => + // This can happen if ack IDs have expired before we acked + Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}") + } + } + } yield () + + /** + * Nack some batches of messages received from pubsub + * + * @param c + * tokens which are keys to batch data held in the shared state + */ + override def nack(c: Vector[Unique.Token]): F[Unit] = + for { + Resources(stub, refAckIds) <- deferredResources.get + ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get))) + ackIds = ackDatas.flatMap(_.ackIds) + // A nack is just a modack with zero duration + _ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero) + } yield () +} + +private object PubsubCheckpointer { + + /** + * Resources needed by `PubsubCheckpointer` so it can ack/nack messages + * + * @param stub + * The GRPC stub needed to execute the ack/nack RPCs + * @param refState + * A map from tokens to the data held about a batch of messages received from pubsub. The map is + * wrapped in a `Ref` because it is concurrently modified by the source adding new batches to + * the state. + */ + case class Resources[F[_]](stub: SubscriberStub, refState: Ref[F, Map[Unique.Token, PubsubBatchState]]) + +} diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubRetryOps.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubRetryOps.scala new file mode 100644 index 0000000..a9274fe --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubRetryOps.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub.v2 + +import cats.implicits._ +import cats.effect.Async +import com.google.api.gax.rpc.{ApiException, StatusCode} +import io.grpc.Status +import org.typelevel.log4cats.Logger +import retry.RetryPolicies +import retry.implicits._ + +import scala.concurrent.duration.DurationDouble + +private object PubsubRetryOps { + + object implicits { + implicit class Ops[F[_], A](val f: F[A]) extends AnyVal { + + def retryingOnTransientGrpcFailures(implicit F: Async[F], L: Logger[F]): F[A] = + f.retryingOnSomeErrors( + isWorthRetrying = { e => isRetryableException(e).pure[F] }, + policy = RetryPolicies.fullJitter(1.second), + onError = { case (t, _) => + Logger[F].info(t)(s"Pubsub retryable GRPC error will be retried: ${t.getMessage}") + } + ) + + def recoveringOnGrpcInvalidArgument(f2: Status => F[A])(implicit F: Async[F]): F[A] = + f.recoverWith { + case StatusFromThrowable(s) if s.getCode.equals(Status.Code.INVALID_ARGUMENT) => + f2(s) + } + } + } + + private object StatusFromThrowable { + def unapply(t: Throwable): Option[Status] = + Some(Status.fromThrowable(t)) + } + + def isRetryableException: Throwable => Boolean = { + case apiException: ApiException => + apiException.getStatusCode.getCode match { + case StatusCode.Code.DEADLINE_EXCEEDED => true + case StatusCode.Code.INTERNAL => true + case StatusCode.Code.CANCELLED => true + case StatusCode.Code.RESOURCE_EXHAUSTED => true + case StatusCode.Code.ABORTED => true + case StatusCode.Code.UNKNOWN => true + case StatusCode.Code.UNAVAILABLE => !apiException.getMessage().contains("Server shutdownNow invoked") + case _ => false + } + case _ => + false + } +} diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala new file mode 100644 index 0000000..b266134 --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub.v2 + +import cats.Show +import io.circe.Decoder +import io.circe.generic.semiauto._ +import io.circe.config.syntax._ +import com.google.pubsub.v1.ProjectSubscriptionName + +import scala.concurrent.duration.FiniteDuration + +import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent + +case class PubsubSourceConfigV2( + subscription: PubsubSourceConfigV2.Subscription, + parallelPullFactor: BigDecimal, + durationPerAckExtension: FiniteDuration, + minRemainingDeadline: Double, + gcpUserAgent: GcpUserAgent, + maxMessagesPerPull: Int, + debounceRequests: FiniteDuration, + prefetch: Int, + logMessageIds: PubsubSourceConfigV2.CustomBoolean +) + +object PubsubSourceConfigV2 { + + case class Subscription(projectId: String, subscriptionId: String) + + case class CustomBoolean(value: Boolean) extends AnyVal + + object CustomBoolean { + implicit def decoder: Decoder[CustomBoolean] = + Decoder.decodeBoolean + .or(Decoder.decodeString.emapTry(s => scala.util.Try(s.toBoolean))) + .map(CustomBoolean(_)) + + } + + object Subscription { + implicit def show: Show[Subscription] = Show[Subscription] { s => + ProjectSubscriptionName.of(s.projectId, s.subscriptionId).toString + } + } + + private implicit def subscriptionDecoder: Decoder[Subscription] = + Decoder.decodeString + .map(_.split("/")) + .emap { + case Array("projects", projectId, "subscriptions", subscriptionId) => + Right(Subscription(projectId, subscriptionId)) + case _ => + Left("Expected format: projects//subscriptions/") + } + + implicit def decoder: Decoder[PubsubSourceConfigV2] = deriveDecoder[PubsubSourceConfigV2] +} diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala new file mode 100644 index 0000000..0e307e2 --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub.v2 + +import cats.effect.{Async, Deferred, Ref, Resource, Sync} +import cats.effect.kernel.Unique +import cats.implicits._ +import fs2.{Chunk, Stream} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import java.time.Instant + +// pubsub +import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider} +import com.google.api.gax.grpc.ChannelPoolSettings +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings +import com.google.pubsub.v1.{PullRequest, PullResponse} +import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStub} +import org.threeten.bp.{Duration => ThreetenDuration} + +// snowplow +import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent +import com.snowplowanalytics.snowplow.sources.SourceAndAck +import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} +import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._ + +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.jdk.CollectionConverters._ + +import java.util.concurrent.{ExecutorService, Executors} + +object PubsubSourceV2 { + + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + + def build[F[_]: Async](config: PubsubSourceConfigV2): F[SourceAndAck[F]] = + Deferred[F, PubsubCheckpointer.Resources[F]].flatMap { deferred => + LowLevelSource.toSourceAndAck(lowLevel(config, deferred)) + } + + private def lowLevel[F[_]: Async]( + config: PubsubSourceConfigV2, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] + ): LowLevelSource[F, Vector[Unique.Token]] = + new LowLevelSource[F, Vector[Unique.Token]] { + def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources) + + def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + pubsubStream(config, deferredResources) + + def lastLiveness: F[FiniteDuration] = + Sync[F].realTime + } + + private def pubsubStream[F[_]: Async]( + config: PubsubSourceConfigV2, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] + ): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + for { + parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config))) + stub <- Stream.resource(stubResource(config)) + refStates <- Stream.eval(Ref[F].of(Map.empty[Unique.Token, PubsubBatchState])) + _ <- Stream.eval(deferredResources.complete(PubsubCheckpointer.Resources(stub, refStates))) + } yield Stream + .fixedRateStartImmediately(config.debounceRequests, dampen = true) + .parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates)) + .unNone + .repeat + .prefetchN(config.prefetch) + .concurrently(extendDeadlines(config, stub, refStates)) + .onFinalize(nackRefStatesForShutdown(config, stub, refStates)) + + private def pullAndManageState[F[_]: Async]( + config: PubsubSourceConfigV2, + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Option[LowLevelEvents[Vector[Unique.Token]]]] = + pullFromSubscription(config, stub).flatMap { response => + if (response.getReceivedMessagesCount > 0) { + val records = response.getReceivedMessagesList.asScala.toVector + val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer())) + val (tstampSeconds, tstampNanos) = + records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min + val ackIds = records.map(_.getAckId) + Sync[F].uncancelable { _ => + for { + _ <- if (config.logMessageIds.value) Sync[F].delay { + println(records.map(_.getMessage.getMessageId).mkString("Pubsub message IDs: ", ",", "")) + } + else Sync[F].unit + timeReceived <- Sync[F].realTimeInstant + _ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) + token <- Unique[F].unique + currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis) + _ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds))) + } yield Some(LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)))) + } + } else { + none.pure[F] + } + } + + private def nackRefStatesForShutdown[F[_]: Async]( + config: PubsubSourceConfigV2, + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Unit] = + refStates.getAndSet(Map.empty).flatMap { m => + Utils.modAck(config.subscription, stub, m.values.flatMap(_.ackIds.toVector).toVector, Duration.Zero) + } + + private def pullFromSubscription[F[_]: Async]( + config: PubsubSourceConfigV2, + stub: SubscriberStub + ): F[PullResponse] = { + val request = PullRequest.newBuilder + .setSubscription(config.subscription.show) + .setMaxMessages(config.maxMessagesPerPull) + .build + val io = for { + apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request)) + res <- FutureInterop.fromFuture[F, PullResponse](apiFuture) + } yield res + Logger[F].trace("Pulling from subscription") *> + io.retryingOnTransientGrpcFailures + .flatTap { response => + Logger[F].trace(s"Pulled ${response.getReceivedMessagesCount} messages") + } + } + + /** + * Modify ack deadlines if we need more time to process the messages + * + * @param config + * The Source configuration + * @param stub + * The GRPC stub on which we can issue modack requests + * @param refStates + * A map from tokens to the data held about a batch of messages received from pubsub. This + * function must update the state if it extends a deadline. + */ + private def extendDeadlines[F[_]: Async]( + config: PubsubSourceConfigV2, + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): Stream[F, Nothing] = + Stream + .eval(Sync[F].realTimeInstant) + .evalMap { now => + val minAllowedDeadline = now.plusMillis((config.minRemainingDeadline * config.durationPerAckExtension.toMillis).toLong) + val newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis) + refStates.modify { m => + val toExtend = m.filter { case (_, batchState) => + batchState.currentDeadline.isBefore(minAllowedDeadline) + } + val fixed = toExtend.view + .mapValues(_.copy(currentDeadline = newDeadline)) + .toMap + (m ++ fixed, toExtend.values.toVector) + } + } + .evalMap { toExtend => + if (toExtend.isEmpty) + Sync[F].sleep(0.5 * config.minRemainingDeadline * config.durationPerAckExtension) + else { + val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds) + Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) + } + } + .repeat + .drain + + private def stubResource[F[_]: Async]( + config: PubsubSourceConfigV2 + ): Resource[F, SubscriberStub] = + for { + executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2))) + subStub <- buildSubscriberStub(config, FixedExecutorProvider.create(executor)) + } yield subStub + + private def buildSubscriberStub[F[_]: Sync]( + config: PubsubSourceConfigV2, + executorProvider: ExecutorProvider + ): Resource[F, GrpcSubscriberStub] = { + val channelProvider = SubscriptionAdminSettings + .defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(20 << 20) + .setMaxInboundMetadataSize(20 << 20) + .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) + .setChannelPoolSettings { + ChannelPoolSettings.staticallySized(1) + } + .build + + val stubSettings = SubscriberStubSettings + .newBuilder() + .setBackgroundExecutorProvider(executorProvider) + .setCredentialsProvider(SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build()) + .setTransportChannelProvider(channelProvider) + .setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent)) + .setEndpoint(SubscriberStubSettings.getDefaultEndpoint()) + .build + + Resource.make(Sync[F].delay(GrpcSubscriberStub.create(stubSettings)))(stub => Sync[F].blocking(stub.shutdownNow)) + } + + private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] = + Resource.make(make)(es => Sync[F].blocking(es.shutdown())) + + /** + * Converts `parallelPullFactor` to a suggested number of parallel pulls + * + * For bigger instances (more cores) the downstream processor can typically process events more + * quickly. So the PubSub subscriber needs more parallelism in order to keep downstream saturated + * with events. + */ + private def chooseNumParallelPulls(config: PubsubSourceConfigV2): Int = + (Runtime.getRuntime.availableProcessors * config.parallelPullFactor) + .setScale(0, BigDecimal.RoundingMode.UP) + .toInt + +} diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/Utils.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/Utils.scala new file mode 100644 index 0000000..59d273a --- /dev/null +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/Utils.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub.v2 + +import cats.effect.{Async, Sync} +import cats.implicits._ +import org.typelevel.log4cats.Logger + +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.ModifyAckDeadlineRequest +import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._ + +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ + +private object Utils { + + def modAck[F[_]: Async: Logger]( + subscription: PubsubSourceConfigV2.Subscription, + stub: SubscriberStub, + ackIds: Vector[String], + duration: FiniteDuration + ): F[Unit] = + ackIds.grouped(1000).toVector.traverse_ { group => + val request = ModifyAckDeadlineRequest.newBuilder + .setSubscription(subscription.show) + .addAllAckIds(group.asJava) + .setAckDeadlineSeconds(duration.toSeconds.toInt) + .build + val io = for { + apiFuture <- Sync[F].delay(stub.modifyAckDeadlineCallable.futureCall(request)) + _ <- FutureInterop.fromFuture_(apiFuture) + } yield () + + io.retryingOnTransientGrpcFailures + .recoveringOnGrpcInvalidArgument { s => + // This can happen if ack IDs were acked before we modAcked + Logger[F].info(s"Ignoring error from GRPC when modifying ack IDs: ${s.getDescription}") + } + } + +}