Skip to content

Latest commit

 

History

History
228 lines (169 loc) · 9.66 KB

README.md

File metadata and controls

228 lines (169 loc) · 9.66 KB

Op-Rabbit

An opinionated RabbitMQ library for Scala and Akka.

See the latest API Docs:

Intro

Op-Rabbit is a high-level, opinionated, composable, fault-tolerant library for interacting with RabbitMQ; the following is a high-level feature list:

  • Recovery:
    • Consumers automatically reconnect and subscribe if the connection is lost
    • Messages published will wait for a connection to be available
  • Integration
    • Connection settings pulled from Typesafe config library
    • Asyncronous, concurrent consumption using Scala native Futures or the new Akka Streams project.
    • Common pattern for serialization allows easy integration with serialization libraries such play-json or json4s
    • Common pattern for exception handling to publish errors to Airbrake, Syslog, or all of the above
  • Modular
    • Composition favored over inheritance enabling flexible and high code reuse.
  • Modeled
    • Queue binding, exchange binding modeled with case classes
    • Publishing mechansims also modeled
  • Reliability
    • Builds on the excellent Akka RabbitMQ client library for easy recovery.
    • Built-in consumer error recovery strategy in which messages are re-delivered to the message queue and retried (not implemented for akka-streams integration as retry mechanism affects message order)
    • With a single message, pause all consumers if service health check fails (IE: database unavailable); easily resume the same.
  • Graceful shutdown
    • Consumers and streams can immediately unsubscribe, but stay alive long enough to wait for any messages to finish being processed.
  • Program at multiple levels of abstraction
    • If op-rabbit doesn't do what you need it to, you can either extend op-rabbit or interact directly with akka-rabbitmq Akka RabbitMQ client.
  • Tested
    • Extensive integration tests

Installation

Add the SpinGo OSS repository and include the dependencies of your choosing:

resolvers ++= Seq(
  "SpinGo OSS" at "http://spingo-oss.s3.amazonaws.com/repositories/releases"
)

val opRabbitVersion = "1.0.0-M1"

libraryDependencies ++= Seq(
  "com.spingo" %% "op-rabbit-core"        % opRabbitVersion,
  "com.spingo" %% "op-rabbit-play-json"   % opRabbitVersion,
  "com.spingo" %% "op-rabbit-json4s"      % opRabbitVersion,
  "com.spingo" %% "op-rabbit-airbrake"    % opRabbitVersion,
  "com.spingo" %% "op-rabbit-akka-stream" % opRabbitVersion
)

A high-level overview of the available components:

  • op-rabbit-core API
    • Implements basic patterns for serialization and message processing.
  • op-rabbit-play-json API
    • Easily use Play Json formats to publish or consume messages; automatically sets RabbitMQ message headers to indicate content type.
  • op-rabbit-json4s API
    • Easily use Json4s to serialization messages; automatically sets RabbitMQ message headers to indicate content type.
  • op-rabbit-airbrake API
    • Report consumer exceptions to airbrake.
  • op-rabbit-akka-stream API
    • Process or publish messages using akka-stream.

Usage

Set up RabbitMQ connection information in application.conf:

rabbitmq {
  topic-exchange-name = "op-rabbit-testeroni"
  hosts = ["127.0.0.1"]
  username = "guest"
  password = "guest"
  port = 5672
  timeout = 3s
}

Note that hosts is an array; Connection attempts will be made to hosts in that order, with a default timeout of 3s. This way you can specify addresses of your rabbitMQ cluster, and if one of the instances goes down, your application will automatically reconnect to another member of the cluster.

topic-exchange-name is the default topic exchange to use; this can be overriden by passing exchange = "my-topic" to TopicBinding or TopicMessage.

Boot up the RabbitMQ control actor:

implicit val actorSystem = ActorSystem("such-system")
val rabbitMq = actorSystem.actorOf(Props[RabbitControl])

Set up a consumer: (Topic subscription)

(this example uses op-rabbit-play-json)

import com.spingo.op_rabbit.PlayJsonSupport._
import com.spingo.op_rabbit._

implicit val personFormat = Json.format[Person] // setup play-json serializer

// A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
val consumer = AsyncAckingConsumer("PersonSignup", qos = 3) { person: Person =>
  Future {
    // do work; when this Future completes, the message will be acknowledged.
    // if the Future fails, after a delay the message will be redelivered for retry (up to 3 times, by default)
  }
}

val subscription = new Subscription(
  TopicBinding(
      queueName = "such-message-queue",
      topics = List("some-topic.#"),
  consumer)

rabbitMq ! subscription

The following methods are available on subscription:

// stop receiving new messages from RabbitMQ immediately; shut down consumer and channel as soon as pending messages are completed. A grace period of 30 seconds is given, after which the subscription forcefully shuts down.
subscription.close(30 seconds)

// Shut things down without a grace period
subscription.abort()

// Future[Unit] which completes once the provided binding has been applied (IE: queue has been created and topic bindings configured). Useful if you need to assert you don't send a message before a message queue is created in which to place it.
subscription.initialized

// Future[Unit] which completes when the subscription is closed.
subscription.closed

// Future[Unit] which completes when the subscription begins closing.
subscription.closing

Publish a message:

rabbitMq ! TopicMessage(Person(name = "Mike How", age = 33), routingKey = "some-topic.very-interest")

rabbitMq ! QueueMessage(Person(name = "Ivanah Tinkle", age = 25), queue = "such-message-queue")

By default, messages will be queued up until a connection is available.

Consuming using Akka streams

(this example uses op-rabbit-play-json and op-rabbit-akka-streams)

import com.spingo.op_rabbit._
import com.spingo.op_rabbit.PlayJsonSupport._
implicit val workFormat = Json.format[Work] // setup play-json serializer

lazy val subscription = Subscription(
  new QueueBinding("such-queue", durable = true, exclusive = false, autoDelete = false),
  RabbitSource[Work](name = "very-stream", qos = qos)) // marshalling is automatically hooked up using implicits

rabbitMq ! subscription

Source(subscription.consumer).
  to(Sink.foreach {
    case (ackPromise, work) =>
      doWork(work)
      ackPromise.success() // fulfilling the promise causes the message to be acknowledge and removed from the queue
  })
  .run

Publishing using Akka streams

(this example uses op-rabbit-play-json and op-rabbit-akka-streams)

import com.spingo.op_rabbit._
import com.spingo.op_rabbit.PlayJsonSupport._
implicit val workFormat = Format[Work] // setup play-json serializer

val sink = RabbitSink[Work](
  "my-sink-name",
  rabbitMq,
  GuaranteedPublishedMessage(QueuePublisher("such-queue")))

Source(1 to 15).
  map { i => (Promise[Unit], i) }.  // each promise will be completed by the sink when message delivery occurs
  to(sink)
  .run

If you can see the pattern here, combining an akka-stream rabbitmq consumer and publisher allows for guaranteed at-least-once message delivery from head to tail; in other words, don't acknowledge the original message until any and all side-effect events have been published and persisted.

Error notification

It's important to know when your consumers fail. Out of the box, op-rabbit ships with support for logging to logback (and therefore syslog), and also airbrake via op-rabbit-airbrake. Without any additional signal provided by you, logback will be used, making error visibility a default.

You can report errors to multiple sources by combining error logging strategies; for example, if you'd like to report to both logback and to airbrake, import / set the following implicit RabbitErrorLogging in the scope where your consumer is instantiated:

import com.spingo.op_rabbit.{LogbackLogger, RabbitControl}

implicit val rabbitErrorLogging = LogbackLogger + AirbrakeLogger.fromConfig

Implementing your own error reporting strategy is simple; here's the source code for the LogbackLogger:

object LogbackLogger extends RabbitErrorLogging {
  def apply(name: String, message: String, exception: Throwable, consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = {
    val logger = LoggerFactory.getLogger(name)
    logger.error(s"${message}. Body=${bodyAsString(body, properties)}. Envelope=${envelope}", exception)
  }
}

Credits

This library builds upon the excellent Akka RabbitMQ client by Yaroslav Klymko.