Scalang is a message passing and actor library that allows Scala and Erlang applications to easily communicate. Scalang includes a full implementation of the Erlang distributed node protocol. It provides an actor oriented API that can be used to interact with Erlang nodes in an idiomatic, OTP compliant way. Scalang is built on Netty for its networking layer and Jetlang for its actor implementation.
From Maven:
<repositories>
<repository>
<id>Boundary Public Repo</id>
<url>http://maven.boundary.com/artifactory/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.boundary</groupId>
<artifactId>scalang-scala_2.9.1</artifactId>
<version>0.18</version>
</dependency>
</dependencies>
From SBT:
val boundaryPublic = "Boundary Public Repo" at "http://maven.boundary.com/artifactory/repo"
val scalang = "com.boundary" %% "scalang" % "0.18"
Scalang currently supports Scala 2.9.1. Scalang also requires epmd in order to run.
The main entry point for Scalang’s API is the node class. An instance of the Node class is a self contained representation of an Erlang Node, distinct in the set of nodes participating in a distributed system. Multiple nodes may be run within the same JVM. Start a node with the following invocation:
val node = Node("[email protected]", "cookie")
Starting a Scalang node like this will register it with the local epmd instance, which must be running on the local host. Scalang will then be available to receive and make connections to other Erlang nodes. In this case, its node name would be [email protected] and its magic cookie is cookie
. Any Erlang or Scalang node which shares this magic cookie can now connect to this node and send messages using the node name.
Scalang shares Erlang’s concept of a process, a lightweight actor that is capable of sending messages to other processes, either local or remote. You can define your own processes by subclassing the Process class. The following code defines and then spawns a process:
class MyProcess(ctx : ProcessContext) extends Process(ctx) {
override def onMessage(msg : Any) {
log.info("received %s", msg)
}
}
val pid = node.spawn[MyProcess]("my_process")
//send to the pid
node.send(pid, "hey there")
//send to the regname
node.send("my_process", "you wanna party?")
Message passing / receiving is the main means by which processes may interact with the outside world. Processes receive messages via the message handler method onMessage. Only one message may be handled by a process at a time unless that process has been spawned with the reentrant option.
The following shows a simple echo server and client that demonstrate message sending.
class EchoServer(ctx : ProcessContext) extends Process(ctx) {
override def onMessage(msg : Any) = msg match {
case (pid : Pid, request : Any) =>
pid ! request
case m =>
log.error("sorry I don't understand %s.", m)
}
}
val server = node.spawn[EchoServer]("echo_server")
val client = node.spawn { mbox =>
mbox.send(server, (client,'derp))
val received = mbox.receive
println("received " + received)
}
Messages can also easily be passed from a remote node. Scalang supports Erlang’s convention of addressing messages to the tuple of a registered name and a node name.
val remoteNode = Node("remote", cookie)
val client = node.spawn { mbox =>
mbox.send(('echo_server, Symbol("test@localhost")), "heyo!")
}
The above code will send a message to the process registered as “echo_server” on the node named “test@localhost”.
Scalang implements the Erlang concept of links. A link is a bidirectional relationship between two processes. If one of the processes exits the link will break and the other process will receive an exit notification. The default behavior of a process during exit notification is for the receiving process to exit with the same error message that was delivered with the link breakage. Creating a link between two processes requires both pids.
Processes that must implement custom behavior may override the trapExit method.
class ExitHandler(ctx : ProcessContext) extends Process(ctx) {
override def onMessage(msg : Any) = msg match {
case _ => log.info("derp %s", msg)
}
override def trapExit(from : Pid, msg : Any) {
log.warning("got exit notification from %s reason %s", from, msg)
}
}
Scalang messages are serialized into Erlang’s external term format. Serialization automatically happens when messages are either sent from or received by a Scalang process. For the most part Scalang provides a 1-1 mapping of Erlangs terms onto Scala types. The type mappings are illustrated below.
From Erlang | To Scala |
Small Integer | Int |
Integer | Int |
Float | Double |
Boolean | Boolean |
Atom | Symbol |
Reference | Reference |
Port | Port |
Pid | Pid |
Small Tuple | Tuple |
Large Tuple | BigTuple |
String | String |
List | List |
Binary | ByteBuffer |
Small Bignum | Long |
Large Bignum | BigInt |
Fun | Fun |
Bistring | Bitstring |
From Scala | To Erlang |
Byte | Small Integer |
Int | Integer |
Long | Small Bignum |
Double | Float |
Symbol | Atom |
Reference | Reference |
Port | Port |
Pid | Pid |
Fun | Fun |
String | String |
List | List |
BigInteger | Large Bignum |
Array[Byte] | Binary |
ByteBuffer | Binary |
BitString | Bitstring |
Tuple | Tuple |
BigTuple | Tuple |
Sometimes the built-in type mappings in Scalang are not sufficient for an application’s message format. Scalang provides the TypeFactory trait for client code to provide custom decoding behavior. A TypeFactory is invoked when Scalang comes across a term that looks like an erlang record: a tuple where the first element is an atom. The createType method is called with the first tuple element as the name and the arity of the tuple.
object StructFactory extends TypeFactory {
def createType(name : Symbol, arity : Int, reader : TermReader) : Option[Seq[Any]] = {
try {
reader.mark
(name,arity) match {
case ('struct,2) => Some(readMap(reader))
case _ => None
}
}
}
protected def readSeq(reader : TermReader) : Map[Symbol,Any] = {
val proplist = reader.readAs[List[Symbol,Any]]
proplist.toMap
}
}
val node = Node("test", cookie, NodeConfig(
typeFactory = StructFactory))
The above code will spawn a Scalang node that uses the StructFactory singleton to decode into a map any arity 2 tuples that begin with the atom struct
. Anything else will get decoded with the normal type mappings.
A more complex example is the CaseClassFactory. It will attempt to decode Erlang records into Scala case classes reflectively.
Most modern Erlang applications are built using the OTP framework, and in particular the gen_server. In order to more effectively interface with gen_server based processes, Scalang has a special kind of process known as a service. Services respond to casts and calls like a gen_server and allow you to send casts and calls to gen_servers running in an Erlang VM.
case class EchoServiceArgs(name : String)
class EchoService(ctx : ServiceContext[EchoServiceArgs]) extends Service(ctx) {
val EchoServiceArgs(name) = ctx.args
override def handleCall(tag: (Pid, Reference), request: Any): Any = {
name + " " + request
}
override def handleCast(request : Any) {
log.info("Can't echo a cast. %s", request)
}
override def handleInfo(request : Any) {
log.info("A wild message appeared. %s, request")
}
}
val node = Node("test", cookie)
val pid = node.spawnService[EchoService, EchoServiceArgs]("echo", EchoServiceArgs("test_echo"))
This will spawn a new process with the registered name “echo” and it will be initialized with the argument “test_echo”. The handleCall method will automatically send its return value to the caller. Casts are meant to be one directional, therefore the return value of handleCast is discarded. The handleInfo method is invoked when a message shows up without the appropriate call or cast semantics.
Scalang uses the same version of Erlang’s node protocol as JInterface. This means that for some operations Scalang nodes are not treated as full fledged members of the cluster. For instance, features like monitors and atom caches are not currently supported. Additionally, Scalang nodes will appear as hidden nodes. This is disadvantageous for Scalang’s goal of transparent interoperability.
Scalang needs its own implementation of Erlang OTP’s supervision tree. All of the primitives are in place to support supervision trees currently, so an implementation of the actual supervisor process is all that’s needed.
Scalang uses Jetlang’s thread pool based actor implementation. There is currently no API for pre-empting these types of Jetlang actors. Therefore either Jetlang needs to be patched to allow this or another actor backend needs to be chosen.