Skip to content

Commit

Permalink
Add initial RxJava2 bus impl
Browse files Browse the repository at this point in the history
  • Loading branch information
graemerocher committed Mar 27, 2017
1 parent ca8b3cc commit 42b2c1d
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,19 @@ class ActorEventBus extends AbstractEventBus implements Closeable {
for(reg in all.get(msg.id)) {
Closure listener = reg.listener
def data = msg.data
if(data.getClass().isArray() && reg.argCount == ((Object[])data).length) {
replyIfExists listener.call(*data)
}
else {
replyIfExists listener.call(data)
try {
def result
if(data.getClass().isArray() && reg.argCount == ((Object[])data).length) {
result = listener.call(*data)
}
else {
result = listener.call(data)
}
replyIfExists(result)
} catch (Throwable e) {
replyIfExists(e)
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ActorEventBusSpec extends Specification {
sleep(500)

expect:
result == null
result instanceof Throwable
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@ class RxEventBus extends AbstractEventBus {
reply = eventWithReply.reply
}

def result
if(data.getClass().isArray() && argCount == ((Object[])data).length) {
result = invokeListener(listener, data)
}
else {
result = listener.call(data)
}
if(reply != null) {
reply.call(result)
try {
def result
if(data.getClass().isArray() && argCount == ((Object[])data).length) {
result = invokeListener(listener, data)
}
else {
result = listener.call(data)
}
if(reply != null) {
reply.call(result)
}
} catch (Throwable e) {
if(reply != null && reply.parameterTypes && reply.parameterTypes[0].isInstance(e)) {
reply.call(e)
}
else {
throw e
}
}

} as Action1, { Throwable t ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ class RxEventBusSpec extends Specification {
}

expect:
result == null
result instanceof Throwable
}
}
4 changes: 4 additions & 0 deletions grails-events-rxjava2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
compile project(":grails-events")
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.grails.async.events.rxjava2


import grails.async.events.Event
import groovy.transform.CompileStatic

/**
* An event with a reply
*
* @since 3.3
* @author Graeme Rocher
*/
@CompileStatic
class EventWithReply {
final Event event
final Closure reply

EventWithReply(Event event, Closure reply) {
this.event = event
this.reply = reply
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package org.grails.async.events.rxjava2

import grails.async.events.Event
import grails.async.events.emitter.EventEmitter
import grails.async.events.registry.EventRegistry
import grails.async.events.registry.Subscription
import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.functions.Consumer
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import org.grails.async.events.bus.AbstractEventBus

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue

/**
* An EventBus implementation that uses RxJava
*
* @author Graeme Rocher
* @since 3.3
*
*/
@CompileStatic
@Slf4j
class RxEventBus extends AbstractEventBus {
protected final Map<CharSequence, PublishSubject> subjects = new ConcurrentHashMap<CharSequence, PublishSubject>().withDefault {
PublishSubject.create()
}
protected final Map<CharSequence, Collection<Disposable>> subscriptions = new ConcurrentHashMap<CharSequence, Collection<Disposable>>().withDefault {
new ConcurrentLinkedQueue<Disposable>()
}


final Scheduler scheduler

RxEventBus(Scheduler scheduler = Schedulers.io()) {
this.scheduler = scheduler
}

@Override
Subscription on(CharSequence event, Closure listener) {
String eventKey = event.toString()
int argCount = listener.parameterTypes?.length ?: 1
Disposable sub = subjects.get(eventKey)
.observeOn(scheduler)
.subscribe( { data ->

Closure reply = null
if(data instanceof EventWithReply) {
def eventWithReply = (EventWithReply) data
data = eventWithReply.event.data
reply = eventWithReply.reply
}

try {
def result
if(data.getClass().isArray() && argCount == ((Object[])data).length) {
result = invokeListener(listener, data)
}
else {
result = listener.call(data)
}
if(reply != null) {
reply.call(result)
}
} catch (Throwable e) {
if(reply != null && reply.parameterTypes && reply.parameterTypes[0].isInstance(e)) {
reply.call(e)
}
else {
throw e
}
}

} as Consumer, { Throwable t ->
log.error("Error occurred triggering event listener for event [$event]: ${t.message}", t)
} as Consumer<Throwable>)
Collection<Disposable> subs = subscriptions.get(eventKey)
subs.add(sub)
return new RxSubscription(sub, subs)
}

@CompileDynamic
protected Object invokeListener(Closure listener, data) {
listener.call(*data)
}

@Override
EventRegistry unsubscribeAll(CharSequence event) {
String eventKey = event.toString()
Collection<Disposable> subs = subscriptions.get(eventKey)
for(sub in subs) {
if(!sub.isDisposed()) {
sub.dispose()
}
}
subs.clear()
return this
}

@Override
EventEmitter notify(Event event) {
PublishSubject sub = subjects.get(event.id)
if(sub.hasObservers() && !sub.hasComplete()) {
sub.onNext(event.data)
}
return this
}

@Override
EventEmitter sendAndReceive(Event event, Closure reply) {
PublishSubject sub = subjects.get(event.id)
if(sub.hasObservers() && !sub.hasComplete()) {
sub.onNext(new EventWithReply(event, reply))
}
return this
}

private static class RxSubscription implements Subscription {
final Disposable subscription
final Collection<Disposable> subscriptions

RxSubscription(Disposable subscription, Collection<Disposable> subscriptions) {
this.subscription = subscription
this.subscriptions = subscriptions
}

@Override
Subscription cancel() {
if(!subscription.isDisposed()) {
subscription.dispose()
}
subscriptions.remove(subscription)
return this
}

@Override
boolean isCancelled() {
return subscription.isDisposed()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.grails.async.events.rxjava2.RxEventBus
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.grails.async.events.rxjava2

import io.reactivex.schedulers.Schedulers
import spock.lang.Specification

/**
* Created by graemerocher on 27/03/2017.
*/
class RxEventBusSpec extends Specification {
void 'test rx event bus single arg'() {
given:
RxEventBus eventBus = new RxEventBus(Schedulers.trampoline())
def result
eventBus.on("test") {
result = "foo $it"
}
eventBus.notify("test", "bar")

expect:
result == 'foo bar'
}

void 'test rx event bus multiple args'() {
given:
RxEventBus eventBus = new RxEventBus(Schedulers.trampoline())
def result
eventBus.on("test") {
result = "foo $it"
}
eventBus.notify("test", "bar", "baz")

expect:
result == 'foo [bar, baz]'
}

void 'test rx event bus multiple args listener'() {
given:
RxEventBus eventBus = new RxEventBus(Schedulers.trampoline())
def result
eventBus.on("test") { String one, String two ->
result = "foo $one $two"
}
eventBus.notify("test", "bar", "baz")

expect:
result == 'foo bar baz'
}

void 'test rx event bus send and receive'() {
given:
RxEventBus eventBus = new RxEventBus(Schedulers.trampoline())
def result
eventBus.on("test") { String data ->
"foo $data"
}
eventBus.sendAndReceive("test", "bar") {
result = it
}
expect:
result == 'foo bar'
}

void 'test rx event bus error handling'() {
given:
RxEventBus eventBus = new RxEventBus(Schedulers.trampoline())
def result
eventBus.on("test") { String data ->
throw new RuntimeException("bad")
}
eventBus.sendAndReceive("test", "bar") {
result = it
}

expect:
result instanceof Throwable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import grails.async.events.registry.EventRegistry
import grails.async.events.registry.Subscription
import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.grails.async.events.bus.AbstractEventBus
import org.grails.async.events.DefaultSubscription
import org.springframework.context.ApplicationListener
Expand Down Expand Up @@ -63,6 +64,7 @@ class SpringEventBus extends AbstractEventBus {
return this
}

@Slf4j
private static class EventBusListener implements ApplicationListener<SpringEventBusEvent> {
final Map<CharSequence, Collection<DefaultSubscription>> registrations

Expand All @@ -74,22 +76,29 @@ class SpringEventBus extends AbstractEventBus {
@CompileDynamic
void onApplicationEvent(SpringEventBusEvent event) {
Event e = event.source
Closure replyTo = event.replyTo
Closure reply = event.replyTo
def data = e.data

for(reg in registrations.get(e.id)) {
Closure listener = reg.listener
boolean isSpread = data.getClass().isArray() && reg.argCount == ((Object[]) data).length
if(isSpread) {
def result = listener.call(*data)
if(replyTo != null) {
replyTo.call(*result)
try {
boolean isSpread = data.getClass().isArray() && reg.argCount == ((Object[]) data).length
if(isSpread) {
def result = listener.call(*data)
if(reply != null) {
reply.call(*result)
}
}
}
else {
def result = listener.call(data)
if(replyTo != null) {
replyTo.call(result)
else {
def result = listener.call(data)
if(reply != null) {
reply.call(result)
}
}
} catch (Throwable t) {
log.error("Error occurred triggering event listener for event [$event]: ${t.message}", t)
if(reply != null && reply.parameterTypes && reply.parameterTypes[0].isInstance(t)) {
reply.call(t)
}
}
}
Expand Down

This file was deleted.

Loading

0 comments on commit 42b2c1d

Please sign in to comment.