diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala index 9493b880..a7aba152 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala @@ -100,8 +100,11 @@ object Flow { .as(Inserted.asInstanceOf[InsertStatus]) .value } else { - Logger[F].debug(s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Nack") >> - event.nack.as(Retry.asInstanceOf[InsertStatus].asRight) + Logger[F] + .debug( + s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Ignoring it so PubSub re-sends it later." + ) + .as(Retry.asInstanceOf[InsertStatus].asRight) } } } yield result diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala index 83a6a4b2..59297b35 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor import cats.effect._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import scala.concurrent.duration.DurationInt object Repeater extends IOApp { @@ -34,7 +35,8 @@ object Repeater extends IOApp { resources.env.projectId, resources.env.config.input.subscription, resources.uninsertable, - resources.env.gcpUserAgent + resources.env.gcpUserAgent, + command.backoffPeriod.seconds ) .interruptWhen(resources.stop) .through[IO, Unit](Flow.sink(resources)) diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala index f5a5f124..72e73717 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala @@ -24,6 +24,8 @@ import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConf import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader import fs2.Stream import org.typelevel.log4cats.Logger +import org.threeten.bp.{Duration => ThreetenDuration} +import scala.concurrent.duration.{DurationInt, FiniteDuration} /** Module responsible for reading Pub/Sub */ object PubSub { @@ -33,18 +35,26 @@ object PubSub { projectId: String, subscription: String, uninsertable: Queue[F, BadRow], - gcpUserAgent: GcpUserAgent + gcpUserAgent: GcpUserAgent, + backoffPeriod: FiniteDuration ): Stream[F, ConsumerRecord[F, EventContainer]] = PubsubGoogleConsumer.subscribe[F, EventContainer]( Model.ProjectId(projectId), Model.Subscription(subscription), (msg, err, ack, _) => callback[F](msg, err, ack, uninsertable), PubsubGoogleConsumerConfig[F]( - onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"), - customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))) + onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"), + customizeSubscriber = Some { + _.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)) + .setMaxAckExtensionPeriod(convertDuration(backoffPeriod.min(1.hour))) + .setMinDurationPerAckExtension(convertDuration(backoffPeriod.min(600.seconds).minus(1.second))) + } ) ) + private def convertDuration(d: FiniteDuration): ThreetenDuration = + ThreetenDuration.ofMillis(d.toMillis) + private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = { val info = FailureDetails.LoaderRecoveryError.ParsingError(err.toString, Nil) val failure = Failure.LoaderRecoveryFailure(info)