Skip to content

Commit

Permalink
Merge pull request #4 from daveray/static-core-clj
Browse files Browse the repository at this point in the history
Static core clj
  • Loading branch information
mattrjacobs committed Aug 30, 2013
2 parents 39239fa + 4fa627d commit d2a00ff
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 133 deletions.
52 changes: 44 additions & 8 deletions language-adaptors/rxjava-clojure/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,57 @@
# Clojure Adaptor for RxJava

This adaptor provides functions and macros to ease Clojure/RxJava interop. In particular, there are functions and macros for turning Clojure functions and code into RxJava `Func*` and `Action*` interfaces without the tedium of manually reifying the interfaces.

This adaptor allows 'fn' functions to be used and RxJava will know how to invoke them.
# Basic Usage

This enables code such as:
## Requiring the interop namespace
The first thing to do is to require the namespace:

```clojure
(->
(Observable/toObservable ["one" "two" "three"])
(.take 2)
(.subscribe (fn [arg] (println arg))))
(ns my.namespace
(:require [rx.lang.clojure.interop :as rx])
(:import [rx Observable]))
```

This still dependes on Clojure using Java interop against the Java API.
or, at the REPL:

A future goal is a Clojure wrapper to expose the functions in a more idiomatic way.
```clojure
(require '[rx.lang.clojure.interop :as rx])
```

## Using rx/fn
Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`:

```clojure
(-> my-observable
(.map (rx/fn [v] (* 2 v))))
```

If you already have a plain old Clojure function you'd like to use, you can pass it to the `rx/fn*` function to get a new object that implements `rx.util.functions.Func`:

```clojure
(-> my-numbers
(.reduce (rx/fn* +)))
```

## Using rx/action
The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts:

```clojure
(-> my-observable
(.map (rx/fn* transform-data))
(.finallyDo (rx/action [] (println "Finished transform")))
(.subscribe (rx/action [v] (println "Got value" v))
(rx/action [e] (println "Get error" e))
(rx/action [] (println "Sequence complete"))))
```

# Gotchas
Here are a few things to keep in mind when using this interop:

* Keep in mind the (mostly empty) distinction between `Func` and `Action` and which is used in which contexts
* If there are multiple Java methods overloaded by `Func` arity, you'll need to use a type hint to let the compiler know which one to choose.
* Methods that take a predicate (like filter) expect the predicate to return a boolean value. A function that returns a non-boolean value will result in a `ClassCastException`.

# Binaries

Expand Down
9 changes: 3 additions & 6 deletions language-adaptors/rxjava-clojure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ apply plugin: 'osgi'
dependencies {
compile project(':rxjava-core')

provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'

// clojure
compile 'org.clojure:clojure:1.4.+'
compile 'clj-http:clj-http:0.6.4' // https://clojars.org/clj-http
//compile 'clj-http:clj-http:0.6.4' // https://clojars.org/clj-http
}

/*
Expand All @@ -20,7 +17,7 @@ warnOnReflection = true

buildscript {
repositories { maven { url "http://clojars.org/repo" } }
dependencies { classpath "clojuresque:clojuresque:1.5.4" }
dependencies { classpath "clojuresque:clojuresque:1.5.8" }
}

repositories {
Expand Down Expand Up @@ -52,4 +49,4 @@ jar {
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
(ns rx.lang.clojure.examples.http-examples
(:require [rx.lang.clojure.interop :as rx]
[clj-http.client :as http])
(:import rx.Observable rx.subscriptions.Subscriptions))

; NOTE on naming conventions. I'm using camelCase names (against clojure convention)
; in this file as I'm purposefully keeping functions and methods across
; different language implementations in-sync for easy comparison.

(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.

return Observable<String> of HTML"
(Observable/create
(rx/fn [observer]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create (rx/action [] (future-cancel f)))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe (rx/action [v] (println "--- Article ---\n" (subs (:body v) 0 125) "...")))))


; --------------------------------------------------
; Error Handling
; --------------------------------------------------

(defn fetchWikipediaArticleAsynchronouslyWithErrorHandling [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously
with proper error handling.

return Observable<String> of HTML"
(Observable/create
(rx/fn [observer]
(let [f (future
(try
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
;(catch Exception e (prn "exception")))
(catch Exception e (-> observer (.onError e))))
; after sending response to onNext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create (rx/action [] (future-cancel f)))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"])
(.subscribe (rx/action [v] (println "--- Article ---\n" (subs (:body v) 0 125) "..."))
(rx/action [e] (println "--- Error ---\n" (.getMessage e))))))


Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns rx.lang.clojure.examples.rx-examples
(:import rx.Observable rx.subscriptions.Subscriptions)
(:require [clj-http.client :as http]))
(:require [rx.lang.clojure.interop :as rx])
(:import rx.Observable rx.subscriptions.Subscriptions))

; NOTE on naming conventions. I'm using camelCase names (against clojure convention)
; in this file as I'm purposefully keeping functions and methods across
Expand All @@ -12,8 +12,8 @@

(defn hello
[& args]
(-> (Observable/toObservable args)
(.subscribe #(println (str "Hello " % "!")))))
(-> (Observable/from args)
(.subscribe (rx/action [v] (println (str "Hello " v "!"))))))

; To see output
(comment
Expand All @@ -23,22 +23,13 @@
; Create Observable from Existing Data
; --------------------------------------------------

(defn existingDataFromNumbers []
(Observable/toObservable [1 2 3 4 5 6]))

(defn existingDataFromNumbersUsingFrom []
(Observable/from [1 2 3 4 5 6]))

(defn existingDataFromObjects []
(Observable/toObservable ["a" "b" "c"]))

(defn existingDataFromObjectsUsingFrom []
(Observable/from ["a" "b" "c"]))

(defn existingDataFromList []
(let [list [5, 6, 7, 8]]
(Observable/toObservable list)))

(defn existingDataFromListUsingFrom []
(let [list [5, 6, 7, 8]]
(Observable/from list)))
Expand All @@ -56,7 +47,7 @@

returns Observable<String>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(doseq [x (range 50)] (-> observer (.onNext (str "value_" x))))
; after sending all values we complete the sequence
(-> observer .onCompleted)
Expand All @@ -66,46 +57,26 @@

; To see output
(comment
(.subscribe (customObservableBlocking) println))
(.subscribe (customObservableBlocking) (rx/action* println)))

(defn customObservableNonBlocking []
"This example shows a custom Observable that does not block
when subscribed to as it spawns a separate thread.

returns Observable<String>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(doseq [x (range 50)]
(-> observer (.onNext (str "anotherValue_" x))))
; after sending all values we complete the sequence
(-> observer .onCompleted))]
; return a subscription that cancels the future
(Subscriptions/create #(future-cancel f))))))

; To see output
(comment
(.subscribe (customObservableNonBlocking) println))


(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.

return Observable<String> of HTML"
(Observable/create
(fn [observer]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "..."))))
(.subscribe (customObservableNonBlocking) (rx/action* println)))


; --------------------------------------------------
Expand All @@ -119,8 +90,8 @@
(customObservableNonBlocking)
(.skip 10)
(.take 5)
(.map #(str % "_transformed"))
(.subscribe #(println "onNext =>" %))))
(.map (rx/fn [v] (str v "_transformed")))
(.subscribe (rx/action [v] (println "onNext =>" v)))))

; To see output
(comment
Expand All @@ -136,7 +107,7 @@

return Observable<Map>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(try
; simulate fetching user data via network service call with latency
Expand All @@ -147,14 +118,14 @@
(-> observer .onCompleted)
(catch Exception e (-> observer (.onError e))))) ]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))

(defn getVideoBookmark [userId, videoId]
"Asynchronously fetch bookmark for video

return Observable<Integer>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(try
; simulate fetching user data via network service call with latency
Expand All @@ -165,13 +136,13 @@
(-> observer .onCompleted)
(catch Exception e (-> observer (.onError e)))))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))

(defn getVideoMetadata [videoId, preferredLanguage]
"Asynchronously fetch movie metadata for a given language
return Observable<Map>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(try
; simulate fetching video data via network service call with latency
Expand All @@ -190,7 +161,7 @@
(-> observer .onCompleted)
(catch Exception e (-> observer (.onError e))))) ]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))


(defn getVideoForUser [userId videoId]
Expand All @@ -200,24 +171,24 @@
- user data
return Observable<Map>"
(let [user-observable (-> (getUser userId)
(.map (fn [user] {:user-name (:name user)
(.map (rx/fn [user] {:user-name (:name user)
:language (:preferred-language user)})))
bookmark-observable (-> (getVideoBookmark userId videoId)
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
(.map (rx/fn [bookmark] {:viewed-position (:position bookmark)})))
; getVideoMetadata requires :language from user-observable so nest inside map function
video-metadata-observable (-> user-observable
(.mapMany
; fetch metadata after a response from user-observable is received
(fn [user-map]
(rx/fn [user-map]
(getVideoMetadata videoId (:language user-map)))))]
; now combine 3 async sequences using zip
(-> (Observable/zip bookmark-observable video-metadata-observable user-observable
(fn [bookmark-map metadata-map user-map]
(rx/fn [bookmark-map metadata-map user-map]
{:bookmark-map bookmark-map
:metadata-map metadata-map
:user-map user-map}))
; and transform into a single response object
(.map (fn [data]
(.map (rx/fn [data]
{:video-id videoId
:video-metadata (:metadata-map data)
:user-id userId
Expand All @@ -231,37 +202,7 @@
(comment
(-> (getVideoForUser 12345 78965)
(.subscribe
(fn [x] (println "--- Object ---\n" x))
(fn [e] (println "--- Error ---\n" e))
(fn [] (println "--- Completed ---")))))


; --------------------------------------------------
; Error Handling
; --------------------------------------------------

(defn fetchWikipediaArticleAsynchronouslyWithErrorHandling [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously
with proper error handling.

return Observable<String> of HTML"
(Observable/create
(fn [observer]
(let [f (future
(try
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
;(catch Exception e (prn "exception")))
(catch Exception e (-> observer (.onError e))))
; after sending response to onNext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")
#(println "--- Error ---\n" (.getMessage %)))))

(rx/action [x] (println "--- Object ---\n" x))
(rx/action [e] (println "--- Error ---\n" e))
(rx/action [] (println "--- Completed ---")))))

Loading

0 comments on commit d2a00ff

Please sign in to comment.