This project aims at providing typesafe communication over kafka, using protobuf as a communication mean, by leveraging the services feature from protobuf, ScalaPB to generate Scala code, and Li Haoyi's amazing autowire library.
Setup works like the following, provided you have defined the following protocol.
import "net/cakesolutions/kafkawire/options.proto";
message Foo {
required string name = 1;
}
message Bar {
required string greeting = 1;
}
service FooBarService {
rpc greet (Foo) returns (Bar);
}
####Service side (consumer) :
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import net.cakesolutions.kafkawire.server.{KafkaServiceActor, KafkaWireRouter}
val config = ConfigFactory.parseString {
s"""
| bootstrap.servers = "address:port"
| group.id = "group"
| enable.auto.commit = false
| auto.offset.reset = "earliest"
|
| schedule.interval = 3000 milliseconds
| unconfirmed.timeout = 3000 milliseconds
| buffer.size = 8
""".stripMargin
}
val serviceSystem : ActorSystem = ???
//The FooBarService trait is generated by ScalaPB, we provide an implementation for it
//that contains the business logic.
val implementation: FooBarService = new FooBarService {
override def greet(request: Foo): Future[Bar] = Future.successful {
Bar(s"greetings ${request.name}")
}
}
//Define a router by overrideing the router method with a call to the route macro.
val router: KafkaWireRouter = new KafkaWireRouter {
override def router: Router = this.route[FooBarService](implementation)
}
//Pass the router to a KafkaConsumerActor which will use it to automatically process
//the incoming messages and redirect them to relevant method in the implementation.
serviceSystem.actorOf(KafkaServiceActor.props(config, router))
####Client side (producer) :*
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import net.cakesolutions.kafkawire.client.KafkaWireClient
val config = ConfigFactory.parseString {
s"""
| bootstrap.servers = "address:port"
| group.id = "group"
| enable.auto.commit = false
| auto.offset.reset = "earliest"
|
| schedule.interval = 3000 milliseconds
| unconfirmed.timeout = 3000 milliseconds
| buffer.size = 8
""".stripMargin
}
val clientSystem : ActorSystem = ???
//CLIENT SETUP
val kafkaWireClient = new KafkaWireClient(clientSystem, config)
import autowire._ //This gives us the ability to use `call` that will create a call to the service.
val result: Future[Bar] = kafkaWireClient[FooBarService].greet(Foo("John")).call()
/// ... DO STUFF ...
//Closing the kafkaProducer
kafkaWireClient.kafkaProducer.close()
With minimum boilerplate, this gives us typesafe RCP calls over Kafka. Because the services are defined as part of the protocol, there is little error possible when handling messages transiting over kafka. If the protocol evolves in a breaking way, the code will simply stop compiling.
This is a proof of concept library, and as of now this implements RCP over Kafka because autowire provides an abstraction for RCP. However, we are planning on extending autowire to be able to write clients that do necessarily expect a reply from a service, so that they are able to send events over the wire in a typesafe manner without any guarantee that a consumer will pick these events.