Skip to content

Commit

Permalink
Temporarily enable logging of pubsub message IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 8, 2024
1 parent d883f4e commit 042f02a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
1 change: 1 addition & 0 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"maxMessagesPerPull": 1000
"debounceRequests": "100 millis"
"prefetch": 4
"logMessageIds": false
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,24 @@ case class PubsubSourceConfigV2(
gcpUserAgent: GcpUserAgent,
maxMessagesPerPull: Int,
debounceRequests: FiniteDuration,
prefetch: Int
prefetch: Int,
logMessageIds: PubsubSourceConfigV2.CustomBoolean
)

object PubsubSourceConfigV2 {

case class Subscription(projectId: String, subscriptionId: String)

case class CustomBoolean(value: Boolean) extends AnyVal

object CustomBoolean {
implicit def decoder: Decoder[CustomBoolean] =
Decoder.decodeBoolean
.or(Decoder.decodeString.emapTry(s => scala.util.Try(s.toBoolean)))
.map(CustomBoolean(_))

}

object Subscription {
implicit def show: Show[Subscription] = Show[Subscription] { s =>
ProjectSubscriptionName.of(s.projectId, s.subscriptionId).toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ object PubsubSourceV2 {
val ackIds = records.map(_.getAckId)
Sync[F].uncancelable { _ =>
for {
_ <- if (config.logMessageIds.value) Sync[F].delay {
println(records.map(_.getMessage.getMessageId).mkString("Pubsub message IDs: ", ",", ""))
}
else Sync[F].unit
timeReceived <- Sync[F].realTimeInstant
_ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension)
token <- Unique[F].unique
Expand Down

0 comments on commit 042f02a

Please sign in to comment.