Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix global supervisor instance, add type hints. #13

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@
:main-opts ["-m" "hf.depstar.jar" "replikativ-superv-async.jar"]}
:deploy {:extra-deps {deps-deploy/deps-deploy {:mvn/version "0.0.9"}}
:main-opts ["-m" "deps-deploy.deps-deploy" "deploy" "replikativ-superv-async.jar"]}
:ffix {:extra-deps {cljfmt/cljfmt {:mvn/version "0.8.0"}}
:main-opts ["-m" "cljfmt.main" "fix"]}
:format {:extra-deps {cljfmt/cljfmt {:mvn/version "0.7.0"}}
:main-opts ["-m" "cljfmt.main" "check"]}}}
:main-opts ["-m" "cljfmt.main" "check"]}}}
35 changes: 26 additions & 9 deletions src/superv/async.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@
on-abort go-super go-loop-super go-for alts?]]))
#?(:clj (:import (clojure.core.async.impl.protocols ReadPort))))



;; The protocols and the binding are needed for the channel ops to be
;; transparent for supervision, most importantly exception tracking


(defprotocol PSupervisor
(-error [this])
(-abort [this])
Expand Down Expand Up @@ -80,19 +77,42 @@
(take! (timeout stale-timeout) pending))) nil)
s))

(defn dummy-supervisor []
(map->TrackingSupervisor
{:error (chan)
:aborts (vec (repeatedly NUM_ABORT_CHANS #(promise-chan)))
:registered (atom {})
:pending-exceptions (atom {})}))

(defn throw-if-exception-
"Helper method that checks if x is Exception and if yes, wraps it in a new
exception, passing though ex-data if any, and throws it. The wrapping is done
to maintain a full stack trace when jumping between multiple contexts."
[x]
(if (instance? #?(:clj Exception :cljs js/Error) x)
(throw (ex-info (or #?(:clj (.getMessage x)) (str x))
(throw (ex-info (or (:clj (.getMessage ^Exception x)) (str x))
(or (ex-data x) {})
x))
x))

#?(:clj
(defmacro native-image-build? []
(try
(and (Class/forName "org.graalvm.nativeimage.ImageInfo")
#_(eval '(org.graalvm.nativeimage.ImageInfo/inImageBuildtimeCode)))
(catch Exception _
false))))

;; a simple global instance, will probably be removed
(def S (simple-supervisor))
(def S
(try
;; We cannot run the simple-supervisor thread in a static context inside
;; native image.
(if (native-image-build?)
(dummy-supervisor)
(simple-supervisor))
(catch Exception _
(simple-supervisor))))

(defn throw-if-exception
"Helper method that checks if x is Exception and if yes, wraps it in a new
Expand All @@ -101,7 +121,7 @@
[S x]
(if (instance? #?(:clj Exception :cljs js/Error) x)
(do (-free-exception S x)
(throw (ex-info (or #?(:clj (.getMessage x)) (str x))
(throw (ex-info (or (:clj (.getMessage ^Exception x)) (str x))
(or (ex-data x) {})
x)))
x))
Expand Down Expand Up @@ -679,11 +699,8 @@ Throws if any result is an exception or the context has been aborted."
[S buf-or-n xform]
(chan buf-or-n xform (fn [e] (put! (:error S) e))))



;; taken from clojure/core ~ 1.7


#?(:clj
(defmacro ^{:private true} assert-args
[& pairs]
Expand Down
18 changes: 0 additions & 18 deletions test/superv/async_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,8 @@
(is (thrown? Exception
(<??* S [(go "1") (go (Exception.))]))))))




;; alt?


(deftest test-alt?
(testing "Test alt? error handling."
(test-async
Expand Down Expand Up @@ -215,10 +211,8 @@
(<?? S (thread-try S
(throw (ex-info "bar" {})))))))))


;; thread-super


#?(:clj
(deftest test-thread-super
(let [err-ch (chan)
Expand All @@ -237,11 +231,8 @@
(thread-super super (/ 1 0))
(<?? S err-ch)))))))



;; go-loop-try


(deftest test-go-loop-try
(test-async
(go
Expand Down Expand Up @@ -278,10 +269,8 @@
(finally (reset! finally-state 42))))
(= @exception-state @finally-state 42)))))))


;; go-loop-super


(deftest test-go-loop-super
(let [err-ch (chan)
abort (chan)
Expand All @@ -296,10 +285,8 @@
(go (is (thrown? #?(:clj Exception :cljs js/Error)
(<? super err-ch)))))))


;; go-for


(deftest test-go-for ;; traditional for comprehension
(test-async
(go
Expand Down Expand Up @@ -327,11 +314,8 @@
:let [b #?(:clj (/ 1 0) :cljs (throw (js/Error. "Oops")))]]
42)))))))



;; supervisor


(deftest test-supervisor
(test-async
(do
Expand Down Expand Up @@ -404,10 +388,8 @@
(go (is (thrown? #?(:clj Exception :cljs js/Error)
(<? S (restarting-supervisor start-fn :retries 3 :stale-timeout 100))))))))


;; transducer embedding


(deftest test-transducer-error-handling
(let [err-ch (chan)
abort (chan)
Expand Down