From 4fa627d6b6ef50f459cef50397af8d873468cfa4 Mon Sep 17 00:00:00 2001 From: Dave Ray Date: Thu, 29 Aug 2013 22:55:22 -0700 Subject: [PATCH] Update rxjava-clojure adaptor. Added rx.lang.clojure.interop namespace with fn and action macros. Updated examples. Updated README. --- language-adaptors/rxjava-clojure/README.md | 52 +++++++-- language-adaptors/rxjava-clojure/build.gradle | 9 +- .../lang/clojure/examples/http_examples.txt | 58 ++++++++++ .../rx/lang/clojure/examples/rx_examples.txt | 109 ++++-------------- .../lang/clojure/examples/video_example.txt | 50 ++++---- .../rx/lang/clojure/DummyClojureClass.clj | 6 - .../main/clojure/rx/lang/clojure/interop.clj | 98 ++++++++++++++++ .../clojure/rx/lang/clojure/interop_test.clj | 88 ++++++++++++++ .../rx/lang/clojure/observable_tests.clj | 7 -- 9 files changed, 344 insertions(+), 133 deletions(-) create mode 100644 language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/http_examples.txt delete mode 100644 language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/DummyClojureClass.clj create mode 100644 language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/interop.clj create mode 100644 language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj delete mode 100644 language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj diff --git a/language-adaptors/rxjava-clojure/README.md b/language-adaptors/rxjava-clojure/README.md index 35faeab8ba..bb452b98a3 100644 --- a/language-adaptors/rxjava-clojure/README.md +++ b/language-adaptors/rxjava-clojure/README.md @@ -1,21 +1,57 @@ # Clojure Adaptor for RxJava +This adaptor provides functions and macros to ease Clojure/RxJava interop. In particular, there are functions and macros for turning Clojure functions and code into RxJava `Func*` and `Action*` interfaces without the tedium of manually reifying the interfaces. -This adaptor allows 'fn' functions to be used and RxJava will know how to invoke them. +# Basic Usage -This enables code such as: +## Requiring the interop namespace +The first thing to do is to require the namespace: ```clojure -(-> - (Observable/toObservable ["one" "two" "three"]) - (.take 2) - (.subscribe (fn [arg] (println arg)))) +(ns my.namespace + (:require [rx.lang.clojure.interop :as rx]) + (:import [rx Observable])) ``` -This still dependes on Clojure using Java interop against the Java API. +or, at the REPL: -A future goal is a Clojure wrapper to expose the functions in a more idiomatic way. +```clojure +(require '[rx.lang.clojure.interop :as rx]) +``` + +## Using rx/fn +Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`: + +```clojure +(-> my-observable + (.map (rx/fn [v] (* 2 v)))) +``` + +If you already have a plain old Clojure function you'd like to use, you can pass it to the `rx/fn*` function to get a new object that implements `rx.util.functions.Func`: + +```clojure +(-> my-numbers + (.reduce (rx/fn* +))) +``` + +## Using rx/action +The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts: + +```clojure +(-> my-observable + (.map (rx/fn* transform-data)) + (.finallyDo (rx/action [] (println "Finished transform"))) + (.subscribe (rx/action [v] (println "Got value" v)) + (rx/action [e] (println "Get error" e)) + (rx/action [] (println "Sequence complete")))) +``` + +# Gotchas +Here are a few things to keep in mind when using this interop: +* Keep in mind the (mostly empty) distinction between `Func` and `Action` and which is used in which contexts +* If there are multiple Java methods overloaded by `Func` arity, you'll need to use a type hint to let the compiler know which one to choose. +* Methods that take a predicate (like filter) expect the predicate to return a boolean value. A function that returns a non-boolean value will result in a `ClassCastException`. # Binaries diff --git a/language-adaptors/rxjava-clojure/build.gradle b/language-adaptors/rxjava-clojure/build.gradle index fc8af32d0b..0aeba9680a 100644 --- a/language-adaptors/rxjava-clojure/build.gradle +++ b/language-adaptors/rxjava-clojure/build.gradle @@ -4,12 +4,9 @@ apply plugin: 'osgi' dependencies { compile project(':rxjava-core') - provided 'junit:junit-dep:4.10' - provided 'org.mockito:mockito-core:1.8.5' - // clojure compile 'org.clojure:clojure:1.4.+' - compile 'clj-http:clj-http:0.6.4' // https://clojars.org/clj-http + //compile 'clj-http:clj-http:0.6.4' // https://clojars.org/clj-http } /* @@ -20,7 +17,7 @@ warnOnReflection = true buildscript { repositories { maven { url "http://clojars.org/repo" } } - dependencies { classpath "clojuresque:clojuresque:1.5.4" } + dependencies { classpath "clojuresque:clojuresque:1.5.8" } } repositories { @@ -52,4 +49,4 @@ jar { instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' instruction 'Fragment-Host', 'com.netflix.rxjava.core' } -} \ No newline at end of file +} diff --git a/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/http_examples.txt b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/http_examples.txt new file mode 100644 index 0000000000..c9c61f757d --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/http_examples.txt @@ -0,0 +1,58 @@ +(ns rx.lang.clojure.examples.http-examples + (:require [rx.lang.clojure.interop :as rx] + [clj-http.client :as http]) + (:import rx.Observable rx.subscriptions.Subscriptions)) + +; NOTE on naming conventions. I'm using camelCase names (against clojure convention) +; in this file as I'm purposefully keeping functions and methods across +; different language implementations in-sync for easy comparison. + +(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames] + "Fetch a list of Wikipedia articles asynchronously. + + return Observable of HTML" + (Observable/create + (rx/fn [observer] + (let [f (future + (doseq [articleName wikipediaArticleNames] + (-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName))))) + ; after sending response to onnext we complete the sequence + (-> observer .onCompleted))] + ; a subscription that cancels the future if unsubscribed + (Subscriptions/create (rx/action [] (future-cancel f))))))) + +; To see output +(comment + (-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) + (.subscribe (rx/action [v] (println "--- Article ---\n" (subs (:body v) 0 125) "..."))))) + + +; -------------------------------------------------- +; Error Handling +; -------------------------------------------------- + +(defn fetchWikipediaArticleAsynchronouslyWithErrorHandling [wikipediaArticleNames] + "Fetch a list of Wikipedia articles asynchronously + with proper error handling. + + return Observable of HTML" + (Observable/create + (rx/fn [observer] + (let [f (future + (try + (doseq [articleName wikipediaArticleNames] + (-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName))))) + ;(catch Exception e (prn "exception"))) + (catch Exception e (-> observer (.onError e)))) + ; after sending response to onNext we complete the sequence + (-> observer .onCompleted))] + ; a subscription that cancels the future if unsubscribed + (Subscriptions/create (rx/action [] (future-cancel f))))))) + +; To see output +(comment + (-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"]) + (.subscribe (rx/action [v] (println "--- Article ---\n" (subs (:body v) 0 125) "...")) + (rx/action [e] (println "--- Error ---\n" (.getMessage e)))))) + + diff --git a/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.txt b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.txt index 943ecb2f17..8245840a6c 100644 --- a/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.txt +++ b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.txt @@ -1,6 +1,6 @@ (ns rx.lang.clojure.examples.rx-examples - (:import rx.Observable rx.subscriptions.Subscriptions) - (:require [clj-http.client :as http])) + (:require [rx.lang.clojure.interop :as rx]) + (:import rx.Observable rx.subscriptions.Subscriptions)) ; NOTE on naming conventions. I'm using camelCase names (against clojure convention) ; in this file as I'm purposefully keeping functions and methods across @@ -12,8 +12,8 @@ (defn hello [& args] - (-> (Observable/toObservable args) - (.subscribe #(println (str "Hello " % "!"))))) + (-> (Observable/from args) + (.subscribe (rx/action [v] (println (str "Hello " v "!")))))) ; To see output (comment @@ -23,22 +23,13 @@ ; Create Observable from Existing Data ; -------------------------------------------------- -(defn existingDataFromNumbers [] - (Observable/toObservable [1 2 3 4 5 6])) (defn existingDataFromNumbersUsingFrom [] (Observable/from [1 2 3 4 5 6])) -(defn existingDataFromObjects [] - (Observable/toObservable ["a" "b" "c"])) - (defn existingDataFromObjectsUsingFrom [] (Observable/from ["a" "b" "c"])) -(defn existingDataFromList [] - (let [list [5, 6, 7, 8]] - (Observable/toObservable list))) - (defn existingDataFromListUsingFrom [] (let [list [5, 6, 7, 8]] (Observable/from list))) @@ -56,7 +47,7 @@ returns Observable" (Observable/create - (fn [observer] + (rx/fn [observer] (doseq [x (range 50)] (-> observer (.onNext (str "value_" x)))) ; after sending all values we complete the sequence (-> observer .onCompleted) @@ -66,7 +57,7 @@ ; To see output (comment - (.subscribe (customObservableBlocking) println)) + (.subscribe (customObservableBlocking) (rx/action* println))) (defn customObservableNonBlocking [] "This example shows a custom Observable that does not block @@ -74,38 +65,18 @@ returns Observable" (Observable/create - (fn [observer] + (rx/fn [observer] (let [f (future (doseq [x (range 50)] (-> observer (.onNext (str "anotherValue_" x)))) ; after sending all values we complete the sequence (-> observer .onCompleted))] ; return a subscription that cancels the future - (Subscriptions/create #(future-cancel f)))))) - -; To see output -(comment - (.subscribe (customObservableNonBlocking) println)) - - -(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames] - "Fetch a list of Wikipedia articles asynchronously. - - return Observable of HTML" - (Observable/create - (fn [observer] - (let [f (future - (doseq [articleName wikipediaArticleNames] - (-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName))))) - ; after sending response to onnext we complete the sequence - (-> observer .onCompleted))] - ; a subscription that cancels the future if unsubscribed - (Subscriptions/create #(future-cancel f)))))) + (Subscriptions/create (rx/action [] (future-cancel f))))))) ; To see output (comment - (-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) - (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))) + (.subscribe (customObservableNonBlocking) (rx/action* println))) ; -------------------------------------------------- @@ -119,8 +90,8 @@ (customObservableNonBlocking) (.skip 10) (.take 5) - (.map #(str % "_transformed")) - (.subscribe #(println "onNext =>" %)))) + (.map (rx/fn [v] (str v "_transformed"))) + (.subscribe (rx/action [v] (println "onNext =>" v))))) ; To see output (comment @@ -136,7 +107,7 @@ return Observable" (Observable/create - (fn [observer] + (rx/fn [observer] (let [f (future (try ; simulate fetching user data via network service call with latency @@ -147,14 +118,14 @@ (-> observer .onCompleted) (catch Exception e (-> observer (.onError e))))) ] ; a subscription that cancels the future if unsubscribed - (Subscriptions/create #(future-cancel f)))))) + (Subscriptions/create (rx/action [] (future-cancel f))))))) (defn getVideoBookmark [userId, videoId] "Asynchronously fetch bookmark for video return Observable" (Observable/create - (fn [observer] + (rx/fn [observer] (let [f (future (try ; simulate fetching user data via network service call with latency @@ -165,13 +136,13 @@ (-> observer .onCompleted) (catch Exception e (-> observer (.onError e)))))] ; a subscription that cancels the future if unsubscribed - (Subscriptions/create #(future-cancel f)))))) + (Subscriptions/create (rx/action [] (future-cancel f))))))) (defn getVideoMetadata [videoId, preferredLanguage] "Asynchronously fetch movie metadata for a given language return Observable" (Observable/create - (fn [observer] + (rx/fn [observer] (let [f (future (try ; simulate fetching video data via network service call with latency @@ -190,7 +161,7 @@ (-> observer .onCompleted) (catch Exception e (-> observer (.onError e))))) ] ; a subscription that cancels the future if unsubscribed - (Subscriptions/create #(future-cancel f)))))) + (Subscriptions/create (rx/action [] (future-cancel f))))))) (defn getVideoForUser [userId videoId] @@ -200,24 +171,24 @@ - user data return Observable" (let [user-observable (-> (getUser userId) - (.map (fn [user] {:user-name (:name user) + (.map (rx/fn [user] {:user-name (:name user) :language (:preferred-language user)}))) bookmark-observable (-> (getVideoBookmark userId videoId) - (.map (fn [bookmark] {:viewed-position (:position bookmark)}))) + (.map (rx/fn [bookmark] {:viewed-position (:position bookmark)}))) ; getVideoMetadata requires :language from user-observable so nest inside map function video-metadata-observable (-> user-observable (.mapMany ; fetch metadata after a response from user-observable is received - (fn [user-map] + (rx/fn [user-map] (getVideoMetadata videoId (:language user-map)))))] ; now combine 3 async sequences using zip (-> (Observable/zip bookmark-observable video-metadata-observable user-observable - (fn [bookmark-map metadata-map user-map] + (rx/fn [bookmark-map metadata-map user-map] {:bookmark-map bookmark-map :metadata-map metadata-map :user-map user-map})) ; and transform into a single response object - (.map (fn [data] + (.map (rx/fn [data] {:video-id videoId :video-metadata (:metadata-map data) :user-id userId @@ -231,37 +202,7 @@ (comment (-> (getVideoForUser 12345 78965) (.subscribe - (fn [x] (println "--- Object ---\n" x)) - (fn [e] (println "--- Error ---\n" e)) - (fn [] (println "--- Completed ---"))))) - - -; -------------------------------------------------- -; Error Handling -; -------------------------------------------------- - -(defn fetchWikipediaArticleAsynchronouslyWithErrorHandling [wikipediaArticleNames] - "Fetch a list of Wikipedia articles asynchronously - with proper error handling. - - return Observable of HTML" - (Observable/create - (fn [observer] - (let [f (future - (try - (doseq [articleName wikipediaArticleNames] - (-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName))))) - ;(catch Exception e (prn "exception"))) - (catch Exception e (-> observer (.onError e)))) - ; after sending response to onNext we complete the sequence - (-> observer .onCompleted))] - ; a subscription that cancels the future if unsubscribed - (Subscriptions/create #(future-cancel f)))))) - -; To see output -(comment - (-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"]) - (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...") - #(println "--- Error ---\n" (.getMessage %))))) - + (rx/action [x] (println "--- Object ---\n" x)) + (rx/action [e] (println "--- Error ---\n" e)) + (rx/action [] (println "--- Completed ---"))))) diff --git a/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.txt b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.txt index 557e54aad6..3d376be4f4 100644 --- a/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.txt +++ b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.txt @@ -1,5 +1,7 @@ (ns rx.lang.clojure.examples.video-example - (:import [rx Observable Observer Subscription] rx.subscriptions.Subscriptions)) + (:require [rx.lang.clojure.interop :as rx]) + (:import [rx Observable Observer Subscription] + rx.subscriptions.Subscriptions)) ; Adapted from language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/VideoExample.groovy @@ -21,11 +23,11 @@ ; how progressive rendering could work (println "---- sequence of video dictionaries ----") (-> (get-video-grid-for-display 1) - (.subscribe #(locking print-lock (println %)) - #(locking print-lock (println "Error: " %)) - #(do - (println "Finished example 1") - (on-complete))))) + (.subscribe (rx/action [v] (locking print-lock (println v))) + (rx/action [v] (locking print-lock (println "Error: " v))) + (rx/action [] + (println "Finished example 1") + (on-complete))))) (defn example2 [on-complete] @@ -34,9 +36,9 @@ ; for document style responses (most webservices) (-> (get-video-grid-for-display 1) .toList - (.subscribe #(println "\n ---- single list of video dictionaries ----\n" %) - #(println "Error: " %) - #(do + (.subscribe (rx/action [v] (println "\n ---- single list of video dictionaries ----\n" v)) + (rx/action [v] (println "Error: " v)) + (rx/action [] (println "Finished Example 2") (println "Exiting") (on-complete))))) @@ -61,27 +63,27 @@ " [user-id] (-> (get-list-of-lists user-id) - (.mapMany (fn [list] + (.mapMany (rx/fn [list] ; for each VideoList we want to fetch the videos (-> (video-list->videos list) (.take 10) ; we only want the first 10 of each list - (.mapMany (fn [video] + (.mapMany (rx/fn [video] ; for each video we want to fetch metadata (let [m (-> (video->metadata video) - (.map (fn [md] + (.map (rx/fn [md] ; transform to the data and format we want {:title (:title md) :length (:duration md) }))) b (-> (video->bookmark video user-id) - (.map (fn [position] + (.map (rx/fn [position] {:bookmark position}))) r (-> (video->rating video user-id) - (.map (fn [rating] + (.map (rx/fn [rating] {:rating {:actual (:actual-star-rating rating) :average (:average-star-rating rating) :predicted (:predicted-star-rating rating) }})))] ; join these together into a single, merged map for each video - (Observable/zip m b r (fn [m b r] + (Observable/zip m b r (rx/fn [m b r] (merge {:id video} m b r))))))))))) @@ -91,9 +93,10 @@ "Returns an observable that executes (f observer) in a future, returning a subscription that will cancel the future." [f] - (Observable/create (fn [^Observer observer] + (Observable/create (rx/fn [^Observer observer] + (println "Starting f") (let [f (future (f observer))] - (Subscriptions/create #(future-cancel f)))))) + (Subscriptions/create (rx/action [] (future-cancel f))))))) (defn ^Observable get-list-of-lists " @@ -109,7 +112,8 @@ (.onCompleted observer)))) -(comment (.subscribe (get-list-of-lists 7777) println)) +(comment (.subscribe (get-list-of-lists 7777) + (rx/action* println))) (defn video-list [position] @@ -118,7 +122,7 @@ (defn ^Observable video-list->videos [{:keys [position] :as video-list}] - (Observable/create (fn [^Observer observer] + (Observable/create (rx/fn [^Observer observer] (dotimes [i 50] (.onNext observer (+ (* position 1000) i))) (.onCompleted observer) @@ -128,23 +132,25 @@ (defn ^Observable video->metadata [video-id] - (Observable/create (fn [^Observer observer] + (Observable/create (rx/fn [^Observer observer] (.onNext observer {:title (str "video-" video-id "-title") :actors ["actor1" "actor2"] :duration 5428 }) (.onCompleted observer) (Subscriptions/empty)))) -(comment (.subscribe (video->metadata 10) println)) +(comment (.subscribe (video->metadata 10) (rx/action* println))) (defn ^Observable video->bookmark [video-id user-id] (future-observable (fn [^Observer observer] (Thread/sleep 4) + (println "onNext") (.onNext observer (if (> (rand-int 6) 1) 0 (rand-int 4000))) + (println "onComplete") (.onCompleted observer)))) -(comment (.subscribe (video->bookmark 112345 99999) println)) +(comment (.subscribe (video->bookmark 112345 99999) (rx/action* println))) (defn ^Observable video->rating [video-id user-id] diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/DummyClojureClass.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/DummyClojureClass.clj deleted file mode 100644 index bcdebdc3bb..0000000000 --- a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/DummyClojureClass.clj +++ /dev/null @@ -1,6 +0,0 @@ -(ns rx.lang.clojure.DummyClojureClass) - -(defn hello-world [username] - (println (format "Hello, %s" username))) - -(hello-world "world") diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/interop.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/interop.clj new file mode 100644 index 0000000000..de87e384dc --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/interop.clj @@ -0,0 +1,98 @@ +(ns rx.lang.clojure.interop + "Functions an macros for instantiating rx Func* and Action* interfaces." + (:refer-clojure :exclude [fn])) + +(def ^:private -ns- *ns*) +(set! *warn-on-reflection* true) + +(defmacro ^:private reify-callable + "Reify a bunch of Callable-like interfaces + + prefix fully qualified interface name. numbers will be appended + arities vector of desired arities + f the function to execute + + " + [prefix arities f] + (let [f-name (gensym "rc")] + `(let [~f-name ~f] + (reify + ~@(mapcat (clojure.core/fn [n] + (let [ifc-sym (symbol (str prefix n)) + arg-syms (map #(symbol (str "v" %)) (range n))] + `(~ifc-sym + (~'call ~(vec (cons 'this arg-syms)) + ~(cons f-name arg-syms))))) + arities) )))) + +(defn fn* + "Given function f, returns an object that implements rx.util.functions.Func0-9 + by delegating the call() method to the given function. + + If the f has the wrong arity, an ArityException will be thrown at runtime. + + Example: + + (.reduce my-numbers (rx/fn* +)) + + See: + http://netflix.github.io/RxJava/javadoc/rx/util/functions/Func0.html + " + [f] + (reify-callable "rx.util.functions.Func" [0 1 2 3 4 5 6 7 8 9] f)) + +(defn fnN* + "Given function f, returns an object that implements rx.util.functions.FuncN + by delegating to the given function. + + Unfortunately, this can't be included in fn* because of ambiguities between + the single arg call() method and the var args call method. + + See: + http://netflix.github.io/RxJava/javadoc/rx/util/functions/FuncN.html + " + [f] + (reify rx.util.functions.FuncN + (call [this objects] + (apply f objects)))) + +(defmacro fn + "Like clojure.core/fn, but returns the appropriate rx.util.functions.Func* + interface. + + Example: + + (.map my-observable (rx/fn [a] (* 2 a))) + + " + [& fn-form] + ; have to qualify fn*. Otherwise bad things happen with the fn* special form in clojure + `(rx.lang.clojure.interop/fn* (clojure.core/fn ~@fn-form))) + +(defn action* + "Given function f, returns an object that implements rx.util.functions.Action0-9 + by delegating to the given function. + + Example: + + (.subscribe my-observable (rx/action* println)) + + See: + http://netflix.github.io/RxJava/javadoc/rx/util/functions/Action0.html + " + [f] + (reify-callable "rx.util.functions.Action" [0 1 2 3] f)) + +(defmacro action + "Like clojure.core/fn, but returns the appropriate rx.util.functions.Action* + interface. + + Example: + + (.finallyDo my-observable (rx/action [] (println \"Finally!\"))) + + " + [& fn-form] + `(action* (clojure.core/fn ~@fn-form))) + +;################################################################################ diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj new file mode 100644 index 0000000000..72dcadaa6e --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj @@ -0,0 +1,88 @@ +(ns rx.lang.clojure.interop-test + (:require [rx.lang.clojure.interop :as rx] + [clojure.test :refer [deftest testing is]]) + (:import [rx Observable] + [rx.observables BlockingObservable] + )) + +(deftest test-fn* + (testing "implements Func0-9" + (let [f (rx/fn* vector)] + (is (instance? rx.util.functions.Func0 f)) + (is (instance? rx.util.functions.Func1 f)) + (is (instance? rx.util.functions.Func2 f)) + (is (instance? rx.util.functions.Func3 f)) + (is (instance? rx.util.functions.Func4 f)) + (is (instance? rx.util.functions.Func5 f)) + (is (instance? rx.util.functions.Func6 f)) + (is (instance? rx.util.functions.Func7 f)) + (is (instance? rx.util.functions.Func8 f)) + (is (instance? rx.util.functions.Func9 f)) + (is (= [] (.call f))) + (is (= [1] (.call f 1))) + (is (= [1 2] (.call f 1 2))) + (is (= [1 2 3] (.call f 1 2 3))) + (is (= [1 2 3 4] (.call f 1 2 3 4))) + (is (= [1 2 3 4 5] (.call f 1 2 3 4 5))) + (is (= [1 2 3 4 5 6] (.call f 1 2 3 4 5 6))) + (is (= [1 2 3 4 5 6 7] (.call f 1 2 3 4 5 6 7))) + (is (= [1 2 3 4 5 6 7 8] (.call f 1 2 3 4 5 6 7 8))) + (is (= [1 2 3 4 5 6 7 8 9] (.call f 1 2 3 4 5 6 7 8 9)))))) + +(deftest test-fn + (testing "makes appropriate Func*" + (let [f (rx/fn [a b c] (println "test-fn") (+ a b c))] + (is (= 6 (.call f 1 2 3)))))) + +(deftest test-fnN* + (testing "implements FuncN" + (is (= (vec (range 99)) + (.call (rx/fnN* vector) (into-array Object (range 99))))))) + +(deftest test-action* + (testing "implements Action0-3" + (let [calls (atom []) + a (rx/action* #(swap! calls conj (vec %&)))] + (is (instance? rx.util.functions.Action0 a)) + (is (instance? rx.util.functions.Action1 a)) + (is (instance? rx.util.functions.Action2 a)) + (is (instance? rx.util.functions.Action3 a)) + (.call a) + (.call a 1) + (.call a 1 2) + (.call a 1 2 3) + (is (= [[] [1] [1 2] [1 2 3]]))))) + +(deftest test-action + (testing "makes appropriate Action*" + (let [called (atom nil) + a (rx/action [a b] (reset! called [a b]))] + (.call a 9 10) + (is (= [9 10] @called))))) + +(deftest test-basic-usage + + (testing "can pass rx/fn to map and friends" + (is (= (+ 1 4 9) + (-> (Observable/from [1 2 3]) + (.map (rx/fn [v] (* v v))) + (.reduce (rx/fn* +)) + (BlockingObservable/single))))) + + (testing "can pass rx/action to subscribe and friends" + (let [finally-called (atom nil) + complete-called (promise) + result (atom []) + o (-> (Observable/from ["4" "5" "6"]) + (.map (rx/fn* #(Long/parseLong %))) + (.finallyDo (rx/action [] + (reset! finally-called true))) + (.reduce (rx/fn [a v] (* a v))) + (.subscribe (rx/action [v] (swap! result conj v)) + (rx/action [e]) + (rx/action [] (deliver complete-called true)))) ] + (is (= true @complete-called)) + (is (= true @finally-called)) + (is (= [(* 4 5 6)] @result))))) + +;################################################################################ diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj deleted file mode 100644 index f0521c2127..0000000000 --- a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj +++ /dev/null @@ -1,7 +0,0 @@ -(ns rx.lang.clojure.observable-tests - (import rx.Observable)) - -;; still need to get this wired up in build.gradle to run as tests -; (-> (rx.Observable/toObservable ["one" "two" "three"]) (.take 2) (.subscribe (fn [arg] (println arg)))) - -; (-> (rx.Observable/toObservable [1 2 3]) (.takeWhile (fn [x i] (< x 2))) (.subscribe (fn [arg] (println arg))))