Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
Fail stream on internal Google PubSub library error
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and oguzhanunlu committed Aug 11, 2021
1 parent a365116 commit ec30f57
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}

import cats.effect.{Blocker, ContextShift, Resource, Sync}
import cats.syntax.all._
import com.google.api.core.ApiService
import com.google.api.gax.batching.FlowControlSettings
import com.google.common.util.concurrent.MoreExecutors
import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber}
import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import com.permutive.pubsub.consumer.{Model => PublicModel}
Expand All @@ -20,13 +22,13 @@ private[consumer] object PubsubSubscriber {
config: PubsubGoogleConsumerConfig[F]
)(implicit
F: Sync[F]
): Resource[F, BlockingQueue[Model.Record[F]]] =
Resource[F, BlockingQueue[Model.Record[F]]] {
): Resource[F, BlockingQueue[Either[Throwable, Model.Record[F]]]] =
Resource[F, BlockingQueue[Either[Throwable, Model.Record[F]]]] {
Sync[F].delay {
val messages = new LinkedBlockingQueue[Model.Record[F]](config.maxQueueSize)
val messages = new LinkedBlockingQueue[Either[Throwable, Model.Record[F]]](config.maxQueueSize)
val receiver = new MessageReceiver {
override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit =
messages.put(Model.Record(message, Sync[F].delay(consumer.ack()), Sync[F].delay(consumer.nack())))
messages.put(Right(Model.Record(message, Sync[F].delay(consumer.ack()), Sync[F].delay(consumer.nack()))))
}
val subscriptionName = ProjectSubscriptionName.of(projectId.value, subscription.value)

Expand All @@ -49,6 +51,7 @@ private[consumer] object PubsubSubscriber {
.map(f => f(builder))
.getOrElse(builder)
.build()
sub.addListener(new PubsubErrorListener(messages), MoreExecutors.directExecutor)

val service = sub.startAsync()
val shutdown =
Expand All @@ -60,6 +63,11 @@ private[consumer] object PubsubSubscriber {
}
}

class PubsubErrorListener[R](messages: BlockingQueue[Either[Throwable, R]]) extends ApiService.Listener {
override def failed(from: ApiService.State, failure: Throwable): Unit =
messages.put(Left(failure))
}

def subscribe[F[_]: Sync: ContextShift](
blocker: Blocker,
projectId: PublicModel.ProjectId,
Expand All @@ -68,6 +76,7 @@ private[consumer] object PubsubSubscriber {
): Stream[F, Model.Record[F]] =
for {
queue <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config))
msg <- Stream.repeatEval(blocker.delay(queue.take()))
next <- Stream.repeatEval(blocker.delay(queue.take()))
msg <- next.fold(Stream.raiseError(_), Stream.emit(_))
} yield msg
}

0 comments on commit ec30f57

Please sign in to comment.