diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d834720c..05a3cc53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,8 +8,10 @@ on: jobs: test: - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest steps: + - name: Install sbt + uses: sbt/setup-sbt@v1 - uses: actions/checkout@v2 - uses: coursier/cache-action@v6 - name: Set up JDK 11 @@ -24,18 +26,56 @@ jobs: publish_docker: needs: test if: github.ref_type == 'tag' - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest strategy: matrix: sbtProject: + - azure - gcp + - aws + - awsHudi + - gcpHudi + - azureHudi + - gcpBiglake include: + - sbtProject: azure + runSnyk: true + targetDir: "modules/azure/target" + dockerSuffix: azure + dockerTagSuffix: "" - sbtProject: gcp runSnyk: true targetDir: "modules/gcp/target" dockerSuffix: gcp dockerTagSuffix: "" + - sbtProject: aws + runSnyk: true + targetDir: "modules/aws/target" + dockerSuffix: aws + dockerTagSuffix: "" + - sbtProject: azureHudi + runSnyk: false + targetDir: "packaging/hudi/target/azure" + dockerSuffix: azure + dockerTagSuffix: "-hudi" + - sbtProject: gcpHudi + runSnyk: false + targetDir: "packaging/hudi/target/gcp" + dockerSuffix: gcp + dockerTagSuffix: "-hudi" + - sbtProject: awsHudi + runSnyk: false + targetDir: "packaging/hudi/target/aws" + dockerSuffix: aws + dockerTagSuffix: "-hudi" + - sbtProject: gcpBiglake + runSnyk: false + targetDir: "packaging/biglake/target/gcp" + dockerSuffix: gcp + dockerTagSuffix: "-biglake" steps: + - name: Install sbt + uses: sbt/setup-sbt@v1 - name: Checkout Github uses: actions/checkout@v2 - uses: coursier/cache-action@v6 diff --git a/modules/gcp/src/main/resources/application.conf b/modules/gcp/src/main/resources/application.conf index b2ff894a..6813e20c 100644 --- a/modules/gcp/src/main/resources/application.conf +++ b/modules/gcp/src/main/resources/application.conf @@ -15,6 +15,8 @@ "durationPerAckExtension": "10 minutes" "minRemainingDeadline": 0.1 "maxMessagesPerPull": 1000 + "debounceRequests": "100 millis" + "prefetch": 4 } "output": { "bad": ${snowplow.defaults.sinks.pubsub} diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala index 498a9681..376caddc 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala @@ -17,12 +17,8 @@ import java.time.Instant * in the future. This is updated over time if we approach a deadline. * @param ackIds * The IDs which are needed to ack all messages in the batch - * @param channelAffinity - * Corresponds to the GRPC channel (TCP connection) on which this batch was pulled. We ack and - * modack on the same channel from where the messages came. */ private case class PubsubBatchState( currentDeadline: Instant, - ackIds: Vector[String], - channelAffinity: Int + ackIds: Vector[String] ) diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala index 631586d9..28a851d2 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala @@ -7,13 +7,11 @@ */ package com.snowplowanalytics.snowplow.sources.pubsub.v2 -import cats.effect.implicits._ import cats.implicits._ import cats.effect.kernel.Unique import cats.effect.{Async, Deferred, Ref, Sync} import com.google.cloud.pubsub.v1.stub.SubscriberStub import com.google.pubsub.v1.AcknowledgeRequest -import com.google.api.gax.grpc.GrpcCallContext import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -58,21 +56,17 @@ class PubsubCheckpointer[F[_]: Async]( for { Resources(stub, refAckIds) <- deferredResources.get ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get))) - grouped = ackDatas.groupBy(_.channelAffinity) - _ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) => - ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds => - val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build - val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity) - val attempt = for { - apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request, context)) - _ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture) - } yield () - attempt.retryingOnTransientGrpcFailures - .recoveringOnGrpcInvalidArgument { s => - // This can happen if ack IDs have expired before we acked - Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}") - } - } + _ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds => + val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build + val attempt = for { + apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request)) + _ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture) + } yield () + attempt.retryingOnTransientGrpcFailures + .recoveringOnGrpcInvalidArgument { s => + // This can happen if ack IDs have expired before we acked + Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}") + } } } yield () @@ -86,12 +80,9 @@ class PubsubCheckpointer[F[_]: Async]( for { Resources(stub, refAckIds) <- deferredResources.get ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get))) - grouped = ackDatas.groupBy(_.channelAffinity) - _ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) => - val ackIds = ackDatas.flatMap(_.ackIds) - // A nack is just a modack with zero duration - Utils.modAck[F](subscription, stub, ackIds, Duration.Zero, channelAffinity) - } + ackIds = ackDatas.flatMap(_.ackIds) + // A nack is just a modack with zero duration + _ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero) } yield () } diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala index 11478e87..87b8b286 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala @@ -23,8 +23,9 @@ case class PubsubSourceConfigV2( durationPerAckExtension: FiniteDuration, minRemainingDeadline: Double, gcpUserAgent: GcpUserAgent, - maxPullsPerTransportChannel: Int, - maxMessagesPerPull: Int + maxMessagesPerPull: Int, + debounceRequests: FiniteDuration, + prefetch: Int ) object PubsubSourceConfigV2 { diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala index f72b2acf..79cca6af 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.sources.pubsub.v2 import cats.effect.{Async, Deferred, Ref, Resource, Sync} import cats.effect.kernel.Unique import cats.implicits._ -import fs2.{Chunk, Pipe, Stream} +import fs2.{Chunk, Stream} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -18,7 +18,7 @@ import java.time.Instant // pubsub import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider} -import com.google.api.gax.grpc.{ChannelPoolSettings, GrpcCallContext} +import com.google.api.gax.grpc.ChannelPoolSettings import com.google.cloud.pubsub.v1.SubscriptionAdminSettings import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings import com.google.pubsub.v1.{PullRequest, PullResponse} @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._ -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} import scala.jdk.CollectionConverters._ import java.util.concurrent.{ExecutorService, Executors} @@ -65,67 +65,72 @@ object PubsubSourceV2 { ): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = for { parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config))) - channelCount = chooseNumTransportChannels(config, parallelPullCount) - stub <- Stream.resource(stubResource(config, channelCount)) + stub <- Stream.resource(stubResource(config)) refStates <- Stream.eval(Ref[F].of(Map.empty[Unique.Token, PubsubBatchState])) _ <- Stream.eval(deferredResources.complete(PubsubCheckpointer.Resources(stub, refStates))) } yield Stream - .range(0, parallelPullCount) - .map(i => miniPubsubStream(config, stub, refStates, i)) - .parJoinUnbounded + .fixedRateStartImmediately(config.debounceRequests, dampen = true) + .parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates)) + .unNone + .repeat + .prefetchN(config.prefetch) + .concurrently(extendDeadlines(config, stub, refStates)) + .onFinalize(nackRefStatesForShutdown(config, stub, refStates)) - private def miniPubsubStream[F[_]: Async]( + private def pullAndManageState[F[_]: Async]( config: PubsubSourceConfigV2, stub: SubscriberStub, - refStates: Ref[F, Map[Unique.Token, PubsubBatchState]], - channelAffinity: Int - ): Stream[F, LowLevelEvents[Vector[Unique.Token]]] = - Stream - .eval[F, PullResponse](pullFromSubscription(config, stub, channelAffinity)) - .filter(_.getReceivedMessagesCount > 0) - .through(addToRefStates(config, stub, refStates, channelAffinity)) - .repeat - .prefetch - .concurrently(extendDeadlines(config, stub, refStates, channelAffinity)) + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Option[LowLevelEvents[Vector[Unique.Token]]]] = + pullFromSubscription(config, stub).flatMap { response => + if (response.getReceivedMessagesCount > 0) { + val records = response.getReceivedMessagesList.asScala.toVector + val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer())) + val (tstampSeconds, tstampNanos) = + records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min + val ackIds = records.map(_.getAckId) + Sync[F].uncancelable { _ => + for { + timeReceived <- Sync[F].realTimeInstant + _ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) + token <- Unique[F].unique + currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis) + _ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds))) + } yield Some(LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)))) + } + } else { + none.pure[F] + } + } - private def pullFromSubscription[F[_]: Async]( + private def nackRefStatesForShutdown[F[_]: Async]( config: PubsubSourceConfigV2, stub: SubscriberStub, - channelAffinity: Int + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Unit] = + refStates.getAndSet(Map.empty).flatMap { m => + Utils.modAck(config.subscription, stub, m.values.flatMap(_.ackIds.toVector).toVector, Duration.Zero) + } + + private def pullFromSubscription[F[_]: Async]( + config: PubsubSourceConfigV2, + stub: SubscriberStub ): F[PullResponse] = { - val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity) val request = PullRequest.newBuilder .setSubscription(config.subscription.show) .setMaxMessages(config.maxMessagesPerPull) .build val io = for { - apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request, context)) + apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request)) res <- FutureInterop.fromFuture[F, PullResponse](apiFuture) } yield res - io.retryingOnTransientGrpcFailures + Logger[F].trace("Pulling from subscription") *> + io.retryingOnTransientGrpcFailures + .flatTap { response => + Logger[F].trace(s"Pulled ${response.getReceivedMessagesCount} messages") + } } - private def addToRefStates[F[_]: Async]( - config: PubsubSourceConfigV2, - stub: SubscriberStub, - refStates: Ref[F, Map[Unique.Token, PubsubBatchState]], - channelAffinity: Int - ): Pipe[F, PullResponse, LowLevelEvents[Vector[Unique.Token]]] = - _.evalMap { response => - val records = response.getReceivedMessagesList.asScala.toVector - val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer())) - val (tstampSeconds, tstampNanos) = - records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min - val ackIds = records.map(_.getAckId) - for { - timeReceived <- Sync[F].realTimeInstant - _ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity) - token <- Unique[F].unique - currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis) - _ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds, channelAffinity))) - } yield LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong))) - } - /** * Modify ack deadlines if we need more time to process the messages * @@ -136,15 +141,11 @@ object PubsubSourceV2 { * @param refStates * A map from tokens to the data held about a batch of messages received from pubsub. This * function must update the state if it extends a deadline. - * @param channelAffinity - * Identifies the GRPC channel (TCP connection) creating these Actions. Each GRPC channel has - * its own concurrent stream modifying the ack deadlines. */ private def extendDeadlines[F[_]: Async]( config: PubsubSourceConfigV2, stub: SubscriberStub, - refStates: Ref[F, Map[Unique.Token, PubsubBatchState]], - channelAffinity: Int + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] ): Stream[F, Nothing] = Stream .eval(Sync[F].realTimeInstant) @@ -153,7 +154,7 @@ object PubsubSourceV2 { val newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis) refStates.modify { m => val toExtend = m.filter { case (_, batchState) => - batchState.channelAffinity === channelAffinity && batchState.currentDeadline.isBefore(minAllowedDeadline) + batchState.currentDeadline.isBefore(minAllowedDeadline) } val fixed = toExtend.view .mapValues(_.copy(currentDeadline = newDeadline)) @@ -166,24 +167,22 @@ object PubsubSourceV2 { Sync[F].sleep(0.5 * config.minRemainingDeadline * config.durationPerAckExtension) else { val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds) - Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity) + Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) } } .repeat .drain private def stubResource[F[_]: Async]( - config: PubsubSourceConfigV2, - channelCount: Int + config: PubsubSourceConfigV2 ): Resource[F, SubscriberStub] = for { executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2))) - subStub <- buildSubscriberStub(config, channelCount, FixedExecutorProvider.create(executor)) + subStub <- buildSubscriberStub(config, FixedExecutorProvider.create(executor)) } yield subStub private def buildSubscriberStub[F[_]: Sync]( config: PubsubSourceConfigV2, - channelCount: Int, executorProvider: ExecutorProvider ): Resource[F, GrpcSubscriberStub] = { val channelProvider = SubscriptionAdminSettings @@ -192,7 +191,7 @@ object PubsubSourceV2 { .setMaxInboundMetadataSize(20 << 20) .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) .setChannelPoolSettings { - ChannelPoolSettings.staticallySized(channelCount) + ChannelPoolSettings.staticallySized(1) } .build @@ -223,16 +222,4 @@ object PubsubSourceV2 { .setScale(0, BigDecimal.RoundingMode.UP) .toInt - /** - * Picks a sensible number of GRPC transport channels (roughly equivalent to a TCP connection) - * - * GRPC has a hard limit of 100 concurrent RPCs on a channel. And experience shows it is healthy - * to stay much under that limit. If we need to open a large number of streaming pulls then we - * might approach/exceed that limit. - */ - private def chooseNumTransportChannels(config: PubsubSourceConfigV2, parallelPullCount: Int): Int = - (BigDecimal(parallelPullCount) / config.maxPullsPerTransportChannel) - .setScale(0, BigDecimal.RoundingMode.UP) - .toInt - } diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/Utils.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/Utils.scala index 97b958c9..59d273a1 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/Utils.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/Utils.scala @@ -11,7 +11,6 @@ import cats.effect.{Async, Sync} import cats.implicits._ import org.typelevel.log4cats.Logger -import com.google.api.gax.grpc.GrpcCallContext import com.google.cloud.pubsub.v1.stub.SubscriberStub import com.google.pubsub.v1.ModifyAckDeadlineRequest import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._ @@ -25,8 +24,7 @@ private object Utils { subscription: PubsubSourceConfigV2.Subscription, stub: SubscriberStub, ackIds: Vector[String], - duration: FiniteDuration, - channelAffinity: Int + duration: FiniteDuration ): F[Unit] = ackIds.grouped(1000).toVector.traverse_ { group => val request = ModifyAckDeadlineRequest.newBuilder @@ -34,9 +32,8 @@ private object Utils { .addAllAckIds(group.asJava) .setAckDeadlineSeconds(duration.toSeconds.toInt) .build - val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity) val io = for { - apiFuture <- Sync[F].delay(stub.modifyAckDeadlineCallable.futureCall(request, context)) + apiFuture <- Sync[F].delay(stub.modifyAckDeadlineCallable.futureCall(request)) _ <- FutureInterop.fromFuture_(apiFuture) } yield ()