Skip to content

Commit

Permalink
Pubsub source v2 de-bounces requests to pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 16, 2024
1 parent 06a0d42 commit 8b83f58
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 106 deletions.
44 changes: 42 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ on:

jobs:
test:
runs-on: ubuntu-22.04
runs-on: ubuntu-latest
steps:
- name: Install sbt
uses: sbt/setup-sbt@v1
- uses: actions/checkout@v2
- uses: coursier/cache-action@v6
- name: Set up JDK 11
Expand All @@ -24,18 +26,56 @@ jobs:
publish_docker:
needs: test
if: github.ref_type == 'tag'
runs-on: ubuntu-22.04
runs-on: ubuntu-latest
strategy:
matrix:
sbtProject:
- azure
- gcp
- aws
- awsHudi
- gcpHudi
- azureHudi
- gcpBiglake
include:
- sbtProject: azure
runSnyk: true
targetDir: "modules/azure/target"
dockerSuffix: azure
dockerTagSuffix: ""
- sbtProject: gcp
runSnyk: true
targetDir: "modules/gcp/target"
dockerSuffix: gcp
dockerTagSuffix: ""
- sbtProject: aws
runSnyk: true
targetDir: "modules/aws/target"
dockerSuffix: aws
dockerTagSuffix: ""
- sbtProject: azureHudi
runSnyk: false
targetDir: "packaging/hudi/target/azure"
dockerSuffix: azure
dockerTagSuffix: "-hudi"
- sbtProject: gcpHudi
runSnyk: false
targetDir: "packaging/hudi/target/gcp"
dockerSuffix: gcp
dockerTagSuffix: "-hudi"
- sbtProject: awsHudi
runSnyk: false
targetDir: "packaging/hudi/target/aws"
dockerSuffix: aws
dockerTagSuffix: "-hudi"
- sbtProject: gcpBiglake
runSnyk: false
targetDir: "packaging/biglake/target/gcp"
dockerSuffix: gcp
dockerTagSuffix: "-biglake"
steps:
- name: Install sbt
uses: sbt/setup-sbt@v1
- name: Checkout Github
uses: actions/checkout@v2
- uses: coursier/cache-action@v6
Expand Down
2 changes: 2 additions & 0 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"durationPerAckExtension": "10 minutes"
"minRemainingDeadline": 0.1
"maxMessagesPerPull": 1000
"debounceRequests": "100 millis"
"prefetch": 4
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ import java.time.Instant
* in the future. This is updated over time if we approach a deadline.
* @param ackIds
* The IDs which are needed to ack all messages in the batch
* @param channelAffinity
* Corresponds to the GRPC channel (TCP connection) on which this batch was pulled. We ack and
* modack on the same channel from where the messages came.
*/
private case class PubsubBatchState(
currentDeadline: Instant,
ackIds: Vector[String],
channelAffinity: Int
ackIds: Vector[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.effect.implicits._
import cats.implicits._
import cats.effect.kernel.Unique
import cats.effect.{Async, Deferred, Ref, Sync}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import com.google.pubsub.v1.AcknowledgeRequest
import com.google.api.gax.grpc.GrpcCallContext
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

Expand Down Expand Up @@ -58,21 +56,17 @@ class PubsubCheckpointer[F[_]: Async](
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
grouped = ackDatas.groupBy(_.channelAffinity)
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
val attempt = for {
apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request, context))
_ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture)
} yield ()
attempt.retryingOnTransientGrpcFailures
.recoveringOnGrpcInvalidArgument { s =>
// This can happen if ack IDs have expired before we acked
Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}")
}
}
_ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
val attempt = for {
apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request))
_ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture)
} yield ()
attempt.retryingOnTransientGrpcFailures
.recoveringOnGrpcInvalidArgument { s =>
// This can happen if ack IDs have expired before we acked
Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}")
}
}
} yield ()

Expand All @@ -86,12 +80,9 @@ class PubsubCheckpointer[F[_]: Async](
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
grouped = ackDatas.groupBy(_.channelAffinity)
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
val ackIds = ackDatas.flatMap(_.ackIds)
// A nack is just a modack with zero duration
Utils.modAck[F](subscription, stub, ackIds, Duration.Zero, channelAffinity)
}
ackIds = ackDatas.flatMap(_.ackIds)
// A nack is just a modack with zero duration
_ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero)
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ case class PubsubSourceConfigV2(
durationPerAckExtension: FiniteDuration,
minRemainingDeadline: Double,
gcpUserAgent: GcpUserAgent,
maxPullsPerTransportChannel: Int,
maxMessagesPerPull: Int
maxMessagesPerPull: Int,
debounceRequests: FiniteDuration,
prefetch: Int
)

object PubsubSourceConfigV2 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ package com.snowplowanalytics.snowplow.sources.pubsub.v2
import cats.effect.{Async, Deferred, Ref, Resource, Sync}
import cats.effect.kernel.Unique
import cats.implicits._
import fs2.{Chunk, Pipe, Stream}
import fs2.{Chunk, Stream}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.time.Instant

// pubsub
import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider}
import com.google.api.gax.grpc.{ChannelPoolSettings, GrpcCallContext}
import com.google.api.gax.grpc.ChannelPoolSettings
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings
import com.google.pubsub.v1.{PullRequest, PullResponse}
Expand All @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.jdk.CollectionConverters._

import java.util.concurrent.{ExecutorService, Executors}
Expand Down Expand Up @@ -65,67 +65,72 @@ object PubsubSourceV2 {
): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] =
for {
parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config)))
channelCount = chooseNumTransportChannels(config, parallelPullCount)
stub <- Stream.resource(stubResource(config, channelCount))
stub <- Stream.resource(stubResource(config))
refStates <- Stream.eval(Ref[F].of(Map.empty[Unique.Token, PubsubBatchState]))
_ <- Stream.eval(deferredResources.complete(PubsubCheckpointer.Resources(stub, refStates)))
} yield Stream
.range(0, parallelPullCount)
.map(i => miniPubsubStream(config, stub, refStates, i))
.parJoinUnbounded
.fixedRateStartImmediately(config.debounceRequests, dampen = true)
.parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates))
.unNone
.repeat
.prefetchN(config.prefetch)
.concurrently(extendDeadlines(config, stub, refStates))
.onFinalize(nackRefStatesForShutdown(config, stub, refStates))

private def miniPubsubStream[F[_]: Async](
private def pullAndManageState[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub,
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
): Stream[F, LowLevelEvents[Vector[Unique.Token]]] =
Stream
.eval[F, PullResponse](pullFromSubscription(config, stub, channelAffinity))
.filter(_.getReceivedMessagesCount > 0)
.through(addToRefStates(config, stub, refStates, channelAffinity))
.repeat
.prefetch
.concurrently(extendDeadlines(config, stub, refStates, channelAffinity))
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]]
): F[Option[LowLevelEvents[Vector[Unique.Token]]]] =
pullFromSubscription(config, stub).flatMap { response =>
if (response.getReceivedMessagesCount > 0) {
val records = response.getReceivedMessagesList.asScala.toVector
val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer()))
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
val ackIds = records.map(_.getAckId)
Sync[F].uncancelable { _ =>
for {
timeReceived <- Sync[F].realTimeInstant
_ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension)
token <- Unique[F].unique
currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis)
_ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds)))
} yield Some(LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong))))
}
} else {
none.pure[F]
}
}

private def pullFromSubscription[F[_]: Async](
private def nackRefStatesForShutdown[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub,
channelAffinity: Int
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]]
): F[Unit] =
refStates.getAndSet(Map.empty).flatMap { m =>
Utils.modAck(config.subscription, stub, m.values.flatMap(_.ackIds.toVector).toVector, Duration.Zero)
}

private def pullFromSubscription[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub
): F[PullResponse] = {
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
val request = PullRequest.newBuilder
.setSubscription(config.subscription.show)
.setMaxMessages(config.maxMessagesPerPull)
.build
val io = for {
apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request, context))
apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request))
res <- FutureInterop.fromFuture[F, PullResponse](apiFuture)
} yield res
io.retryingOnTransientGrpcFailures
Logger[F].trace("Pulling from subscription") *>
io.retryingOnTransientGrpcFailures
.flatTap { response =>
Logger[F].trace(s"Pulled ${response.getReceivedMessagesCount} messages")
}
}

private def addToRefStates[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub,
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
): Pipe[F, PullResponse, LowLevelEvents[Vector[Unique.Token]]] =
_.evalMap { response =>
val records = response.getReceivedMessagesList.asScala.toVector
val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer()))
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
val ackIds = records.map(_.getAckId)
for {
timeReceived <- Sync[F].realTimeInstant
_ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity)
token <- Unique[F].unique
currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis)
_ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds, channelAffinity)))
} yield LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)))
}

/**
* Modify ack deadlines if we need more time to process the messages
*
Expand All @@ -136,15 +141,11 @@ object PubsubSourceV2 {
* @param refStates
* A map from tokens to the data held about a batch of messages received from pubsub. This
* function must update the state if it extends a deadline.
* @param channelAffinity
* Identifies the GRPC channel (TCP connection) creating these Actions. Each GRPC channel has
* its own concurrent stream modifying the ack deadlines.
*/
private def extendDeadlines[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub,
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]]
): Stream[F, Nothing] =
Stream
.eval(Sync[F].realTimeInstant)
Expand All @@ -153,7 +154,7 @@ object PubsubSourceV2 {
val newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis)
refStates.modify { m =>
val toExtend = m.filter { case (_, batchState) =>
batchState.channelAffinity === channelAffinity && batchState.currentDeadline.isBefore(minAllowedDeadline)
batchState.currentDeadline.isBefore(minAllowedDeadline)
}
val fixed = toExtend.view
.mapValues(_.copy(currentDeadline = newDeadline))
Expand All @@ -166,24 +167,22 @@ object PubsubSourceV2 {
Sync[F].sleep(0.5 * config.minRemainingDeadline * config.durationPerAckExtension)
else {
val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds)
Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity)
Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension)
}
}
.repeat
.drain

private def stubResource[F[_]: Async](
config: PubsubSourceConfigV2,
channelCount: Int
config: PubsubSourceConfigV2
): Resource[F, SubscriberStub] =
for {
executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2)))
subStub <- buildSubscriberStub(config, channelCount, FixedExecutorProvider.create(executor))
subStub <- buildSubscriberStub(config, FixedExecutorProvider.create(executor))
} yield subStub

private def buildSubscriberStub[F[_]: Sync](
config: PubsubSourceConfigV2,
channelCount: Int,
executorProvider: ExecutorProvider
): Resource[F, GrpcSubscriberStub] = {
val channelProvider = SubscriptionAdminSettings
Expand All @@ -192,7 +191,7 @@ object PubsubSourceV2 {
.setMaxInboundMetadataSize(20 << 20)
.setKeepAliveTime(ThreetenDuration.ofMinutes(5))
.setChannelPoolSettings {
ChannelPoolSettings.staticallySized(channelCount)
ChannelPoolSettings.staticallySized(1)
}
.build

Expand Down Expand Up @@ -223,16 +222,4 @@ object PubsubSourceV2 {
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

/**
* Picks a sensible number of GRPC transport channels (roughly equivalent to a TCP connection)
*
* GRPC has a hard limit of 100 concurrent RPCs on a channel. And experience shows it is healthy
* to stay much under that limit. If we need to open a large number of streaming pulls then we
* might approach/exceed that limit.
*/
private def chooseNumTransportChannels(config: PubsubSourceConfigV2, parallelPullCount: Int): Int =
(BigDecimal(parallelPullCount) / config.maxPullsPerTransportChannel)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

}
Loading

0 comments on commit 8b83f58

Please sign in to comment.