Skip to content

Commit

Permalink
Merge pull request #330 from permutive/increase-chunk-size
Browse files Browse the repository at this point in the history
  • Loading branch information
CremboC authored Jan 5, 2022
2 parents 74ccdc6 + 6de7203 commit 27466a7
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ object PubsubGoogleConsumer {
errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit],
config: PubsubGoogleConsumerConfig[F]
): Stream[F, ConsumerRecord[F, A]] =
PubsubSubscriber
.subscribe(projectId, subscription, config)
.flatMap { case internal.Model.Record(msg, ack, nack) =>
MessageDecoder[A].decode(msg.getData.toByteArray) match {
case Left(e) => Stream.exec(errorHandler(msg, e, ack, nack))
case Right(v) =>
Stream.emit(ConsumerRecord(v, msg.getAttributesMap.asScala.toMap, ack, nack, _ => Applicative[F].unit))
}
}
subscribeDecode[F, A, ConsumerRecord[F, A]](
projectId,
subscription,
errorHandler,
config,
onDecode = (record, value) =>
Applicative[F].pure(
ConsumerRecord(
value,
record.value.getAttributesMap.asScala.toMap,
record.ack,
record.nack,
_ => Applicative[F].unit
)
),
)

/**
* Subscribe with automatic acknowledgement
Expand All @@ -63,14 +70,13 @@ object PubsubGoogleConsumer {
errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit],
config: PubsubGoogleConsumerConfig[F]
): Stream[F, A] =
PubsubSubscriber
.subscribe(projectId, subscription, config)
.flatMap { case internal.Model.Record(msg, ack, nack) =>
MessageDecoder[A].decode(msg.getData.toByteArray) match {
case Left(e) => Stream.exec(errorHandler(msg, e, ack, nack))
case Right(v) => Stream.eval(ack >> v.pure)
}
}
subscribeDecode[F, A, A](
projectId,
subscription,
errorHandler,
config,
onDecode = (record, value) => record.ack.as(value),
)

/**
* Subscribe to the raw stream, receiving the the message as retrieved from PubSub
Expand All @@ -87,4 +93,21 @@ object PubsubGoogleConsumer {
.map(msg =>
ConsumerRecord(msg.value, msg.value.getAttributesMap.asScala.toMap, msg.ack, msg.nack, _ => Applicative[F].unit)
)

private def subscribeDecode[F[_]: Sync, A: MessageDecoder, B](
projectId: Model.ProjectId,
subscription: Model.Subscription,
errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit],
config: PubsubGoogleConsumerConfig[F],
onDecode: (internal.Model.Record[F], A) => F[B],
): Stream[F, B] =
PubsubSubscriber
.subscribe(projectId, subscription, config)
.evalMapChunk[F, Option[B]](record =>
MessageDecoder[A].decode(record.value.getData.toByteArray) match {
case Left(e) => errorHandler(record.value, e, record.ack, record.ack).as(None)
case Right(v) => onDecode(record, v).map(Some(_))
}
)
.unNone
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumer.InternalPubSubError
import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumerConfig
import com.permutive.pubsub.consumer.{Model => PublicModel}
import fs2.Stream
import fs2.{Chunk, Stream}
import org.threeten.bp.Duration

import java.util
import collection.JavaConverters._
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}

private[consumer] object PubsubSubscriber {
Expand Down Expand Up @@ -69,11 +71,21 @@ private[consumer] object PubsubSubscriber {
queue.put(Left(InternalPubSubError(failure)))
}

def takeNextElement[F[_]: Sync, A](messages: BlockingQueue[A]): F[A] =
def takeNextElements[F[_]: Sync, A](messages: BlockingQueue[A]): F[Chunk[A]] =
for {
nextOpt <- Sync[F].delay(Option(messages.poll())) // `poll` is non-blocking, returning `null` if queue is empty
next <- nextOpt.fold(Sync[F].blocking(messages.take()))(Applicative[F].pure) // `take` can wait for an element
} yield next
nextOpt <- Sync[F].delay(messages.poll()) // `poll` is non-blocking, returning `null` if queue is empty
// `take` can wait for an element
next <-
if (nextOpt == null) Sync[F].interruptible(many = true)(messages.take())
else Applicative[F].pure(nextOpt)
chunk <- Sync[F].delay {
val elements = new util.ArrayList[A]
elements.add(next)
messages.drainTo(elements)

Chunk.buffer(elements.asScala)
}
} yield chunk

def subscribe[F[_]: Sync](
projectId: PublicModel.ProjectId,
Expand All @@ -84,8 +96,9 @@ private[consumer] object PubsubSubscriber {
queue <- Stream.eval(
Sync[F].delay(new LinkedBlockingQueue[Either[InternalPubSubError, Model.Record[F]]](config.maxQueueSize))
)
_ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue))
next <- Stream.repeatEval(takeNextElement(queue))
msg <- Stream.fromEither[F](next)
_ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue))
taken <- Stream.repeatEval(takeNextElements(queue))
// Only retains the first error (if there are multiple), but that is OK, the stream is failing anyway...
msg <- Stream.fromEither[F](taken.sequence).unchunks
} yield msg
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ private[producer] object PubsubPublisher {
): Resource[F, Publisher] =
Resource[F, Publisher] {
Sync[F].delay {
val topicName = ProjectTopicName.of(projectId.value, topic.value)

val publisherBuilder =
Publisher
.newBuilder(ProjectTopicName.of(projectId.value, topic.value))
.newBuilder(topicName)
.setBatchingSettings(
BatchingSettings
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package com.permutive.pubsub

import cats.effect._
import cats.syntax.all._
import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, TopicAdminClient}
import com.google.pubsub.v1.{ProjectSubscriptionName, TopicName}
import com.permutive.pubsub.consumer.ConsumerRecord
import com.permutive.pubsub.producer.PubsubProducer
import fs2.Stream
import org.scalatest.BeforeAndAfterEach
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration._

class GrpcPingPongSpec extends PubSubSpec with BeforeAndAfterEach {

implicit val logger: Logger[IO] = Slf4jLogger.getLogger

// Delete topic and subscriptions after each test to ensure state is clean
override def afterEach(): Unit =
clearTopicSubscription
.handleErrorWith(_ => logger.warn("Errors were thrown after tests on clean-up"))
.unsafeRunSync()

private[this] val topicAndSubscriptionClient: Resource[IO, (TopicAdminClient, SubscriptionAdminClient)] =
for {
transCreds <- providers
topicClient <- topicAdminClient(transCreds._1, transCreds._2)
subscriptionClient <- subscriptionAdminClient(transCreds._1, transCreds._2)
} yield (topicClient, subscriptionClient)

private[this] val clearTopicSubscription: IO[Unit] =
topicAndSubscriptionClient.use { case (topicClient, subscriptionClient) =>
for {
_ <- deleteSubscription(subscriptionClient, ProjectSubscriptionName.of(project, subscription))
_ <- deleteTopic(topicClient, TopicName.of(project, topic))
} yield ()
}

private def setup(ackDeadlineSeconds: Int): Resource[IO, (PubsubProducer[IO, ValueHolder])] =
for {
_ <- Resource.eval(createTopic(project, topic))
_ <- Resource.eval(createSubscription(project, topic, subscription, ackDeadlineSeconds))
p <- producer()
} yield p

private def consumeExpectingLimitedMessages(
messagesExpected: Int,
): Stream[IO, ConsumerRecord[IO, ValueHolder]] =
consumer()
// Check we only receive a single element
.zipWithIndex
.flatMap { case (record, ix) =>
Stream.fromEither[IO](
// Index is 0-based, so we expected index to reach 1 less than `messagesExpected`
if (ix < messagesExpected.toLong) Right(record)
else Left(new RuntimeException(s"Received more than $messagesExpected from PubSub"))
)
}
// Check body is as we expect
.flatMap(record =>
Stream.fromEither[IO](
if (record.value == ValueHolder("ping")) Right(record)
else Left(new RuntimeException(s"Consumed element did not have correct value: ${record.value}"))
)
)

private def consumeAndAck(
elementsReceived: Ref[IO, Int],
): Stream[IO, ConsumerRecord[IO, ValueHolder]] =
consumeExpectingLimitedMessages(messagesExpected = 1)
// Indicate we have received the element as expected
.evalTap(_ => elementsReceived.update(_ + 1))
.evalTap(_.ack)

it should "send and receive a message, acknowledging as expected" in {
(for {
// We will sleep for 10 seconds, which means if the message is not acked it will be redelivered before end of test
producer <- Stream.resource(setup(ackDeadlineSeconds = 5))
_ <- Stream.eval(producer.produce(ValueHolder("ping")))
ref <- Stream.eval(Ref.of[IO, Int](0))
// Wait 10 seconds whilst we run the consumer to check we have a single element and it has the right data
_ <- Stream.sleep[IO](10.seconds).concurrently(consumeAndAck(ref))
elementsReceived <- Stream.eval(ref.get)
} yield elementsReceived should ===(1)).compile.drain
.timeout(30.seconds) // avoiding running forever in case of an issue
.unsafeRunSync()
}

private def consumeExtendSleepAck(
elementsReceived: Ref[IO, Int],
extendDuration: FiniteDuration,
sleepDuration: FiniteDuration,
): Stream[IO, ConsumerRecord[IO, ValueHolder]] =
consumeExpectingLimitedMessages(messagesExpected = 1)
.evalTap(_.extendDeadline(extendDuration))
.evalTap(_ => IO.sleep(sleepDuration))
// Indicate we have received the element as expected
.evalTap(_ => elementsReceived.update(_ + 1))
.evalTap(_.ack)

it should "extend the deadline for a message" in {
// These setting mean that if extension does not work the message will be redelivered before the end of the test
val ackDeadlineSeconds = 2
val sleepDuration = 3.seconds
val extendDuration = 10.seconds

(for {
producer <- Stream.resource(setup(ackDeadlineSeconds = ackDeadlineSeconds))
_ <- Stream.eval(producer.produce(ValueHolder("ping")))
ref <- Stream.eval(Ref.of[IO, Int](0))
// Wait 10 seconds whilst we run the consumer to check we have a single element and it has the right data
_ <- Stream
.sleep[IO](10.seconds)
.concurrently(
consumeExtendSleepAck(ref, extendDuration = extendDuration, sleepDuration = sleepDuration)
)
elementsReceived <- Stream.eval(ref.get)
} yield elementsReceived should ===(1))
.as(ExitCode.Success)
.compile
.drain
.timeout(30.seconds) // avoiding running forever in case of an issue
.unsafeRunSync()
}

private def consumeNackThenAck(
elementsReceived: Ref[IO, Int],
messagesExpected: Int,
): Stream[IO, Unit] =
// We expect the message to be redelivered once, so expect 2 messages total
consumeExpectingLimitedMessages(messagesExpected)
// Indicate we have received the element as expected
.evalTap(_ => elementsReceived.update(_ + 1))
// Nack the first message, then ack subsequent ones
.evalScan(false) { case (nackedAlready, record) =>
if (nackedAlready) record.ack.as(true) else record.nack.as(true)
}
.void

it should "nack a message properly" in {
// These setting mean that a message will only be redelivered if it is nacked
val ackDeadlineSeconds = 100
val messagesExpected = 2

(for {
producer <- Stream.resource(setup(ackDeadlineSeconds = ackDeadlineSeconds))
_ <- Stream.eval(producer.produce(ValueHolder("ping")))
ref <- Stream.eval(Ref.of[IO, Int](0))
// Wait 10 seconds whilst we run the consumer which nacks the message, then acks
_ <- Stream
.sleep[IO](10.seconds)
.concurrently(consumeNackThenAck(ref, messagesExpected))
elementsReceived <- Stream.eval(ref.get)
} yield elementsReceived should ===(messagesExpected))
.as(ExitCode.Success)
.compile
.drain
.timeout(30.seconds) // avoiding running forever in case of an issue
.unsafeRunSync()
}

}
Loading

0 comments on commit 27466a7

Please sign in to comment.