Skip to content

Commit

Permalink
Pubsub source v2 pre-fetches more messages when blocked
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 13, 2024
1 parent e8ea8ea commit bb4eef6
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 215 deletions.
5 changes: 2 additions & 3 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
"durationPerAckExtension": "10 minutes"
"minRemainingDeadline": 0.1
"progressTimeout": "10 seconds"
"modackOnProgressTimeout": true
"cancelOnProgressTimeout": false
"consistentClientId": true
"prefetchMin": 1
"prefetchMax": 10
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.effect.{Async, Ref, Resource, Sync}
import cats.effect.kernel.Unique
import cats.implicits._
import cats.effect.implicits._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.google.cloud.pubsub.v1.stub.SubscriberStub

private trait LeaseManager[F[_], A] {
def manageLeases(in: A): F[Unique.Token]
def stopManagingLeases(tokens: Vector[Unique.Token]): F[Unit]
}

private object LeaseManager {

private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]

def resource[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub,
ref: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
): Resource[F, LeaseManager[F, SubscriberAction.ProcessRecords]] =
extendDeadlinesInBackground[F](config, stub, ref, channelAffinity)
.as(impl(config, ref, channelAffinity))

private def impl[F[_]: Sync](
config: PubsubSourceConfigV2,
ref: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
): LeaseManager[F, SubscriberAction.ProcessRecords] = new LeaseManager[F, SubscriberAction.ProcessRecords] {

def manageLeases(in: SubscriberAction.ProcessRecords): F[Unique.Token] =
Unique[F].unique.flatMap { token =>
val deadline = in.timeReceived.plusMillis(config.durationPerAckExtension.toMillis)
val ackIds = in.records.map(_.getAckId)
val state = PubsubBatchState(deadline, ackIds, channelAffinity)
ref.update(_ + (token -> state)).as(token)
}

def stopManagingLeases(tokens: Vector[Unique.Token]): F[Unit] =
ref.update(_.removedAll(tokens))
}

private def extendDeadlinesInBackground[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub,
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
): Resource[F, Unit] = {
def go: F[Unit] = for {
now <- Sync[F].realTimeInstant
minAllowedDeadline = now.plusMillis((config.minRemainingDeadline * config.durationPerAckExtension.toMillis).toLong)
newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis)
toExtend <- refStates.modify { m =>
val toExtend = m.filter { case (_, batchState) =>
batchState.channelAffinity === channelAffinity && batchState.currentDeadline.isBefore(minAllowedDeadline)
}
val fixed = toExtend.view
.mapValues(_.copy(currentDeadline = newDeadline))
.toMap
(m ++ fixed, toExtend.values.toVector)
}
_ <- if (toExtend.isEmpty)
Sync[F].sleep(0.5 * config.minRemainingDeadline * config.durationPerAckExtension)
else {
val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds.toVector)
Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity)
}
_ <- go
} yield ()
go.background.void
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.data.NonEmptyVector

import java.time.Instant

/**
Expand All @@ -23,6 +25,6 @@ import java.time.Instant
*/
private case class PubsubBatchState(
currentDeadline: Instant,
ackIds: Vector[String],
ackIds: NonEmptyVector[String],
channelAffinity: Int
)
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class PubsubCheckpointer[F[_]: Async](
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
grouped = ackDatas.groupBy(_.channelAffinity)
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
ackDatas.flatMap(_.ackIds.toVector).grouped(1000).toVector.traverse_ { ackIds =>
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
val attempt = for {
Expand Down Expand Up @@ -88,7 +88,7 @@ class PubsubCheckpointer[F[_]: Async](
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
grouped = ackDatas.groupBy(_.channelAffinity)
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
val ackIds = ackDatas.flatMap(_.ackIds)
val ackIds = ackDatas.flatMap(_.ackIds.toVector)
// A nack is just a modack with zero duration
Utils.modAck[F](subscription, stub, ackIds, Duration.Zero, channelAffinity)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ case class PubsubSourceConfigV2(
gcpUserAgent: GcpUserAgent,
maxPullsPerTransportChannel: Int,
progressTimeout: FiniteDuration,
modackOnProgressTimeout: Boolean,
cancelOnProgressTimeout: Boolean,
consistentClientId: Boolean
prefetchMin: Int,
prefetchMax: Int
)

object PubsubSourceConfigV2 {
Expand Down
Loading

0 comments on commit bb4eef6

Please sign in to comment.