diff --git a/src/k16/gx/beta/core.cljc b/src/k16/gx/beta/core.cljc index 1862f4c1..0ee5d0c5 100644 --- a/src/k16/gx/beta/core.cljc +++ b/src/k16/gx/beta/core.cljc @@ -58,13 +58,11 @@ {:initial-state :uninitialised :normalize {;; signal, whish is default for static component nodes :auto-signal :gx/start - :props-signals #{:gx/start :gx/stop}} + :props-signals #{:gx/start}} :signal-mapping {} - :signals {:gx/start {:order :topological - :from-states #{:stopped :uninitialised} + :signals {:gx/start {:from-states #{:stopped :uninitialised} :to-state :started} - :gx/stop {:order :reverse-topological - :from-states #{:started} + :gx/stop {:from-states #{:started} :to-state :stopped :deps-from :gx/start}}}) @@ -147,7 +145,7 @@ def? (and (map? signal-def) (some #{:gx/props :gx/props-fn :gx/processor :gx/deps - :gx/resolved-props :gx/after} + :gx/resolved-props} (keys signal-def))) with-pushed-down-form (if def? @@ -301,12 +299,32 @@ {:gx/type (if def? :component :static) :gx/normalized? true}))))) +(defn signal-dependencies + [{:keys [signals]}] + (->> signals + (map (fn [[k v]] + [k (if-let [f (:deps-from v)] + #{f} + #{})])) + (into {}))) + +(defn validate-context + "Validates context against schema and checks signal dependency errors" + [context] + (or (gx.schema/validate-context context) + (let [deps (signal-dependencies default-context)] + (->> deps + (impl/sccs) + (impl/dependency-errors deps) + (map impl/human-render-dependency-error) + (seq))))) + (defn normalize "Given a graph definition and config, return a normalised form. Idempotent. This acts as the static analysis step of the graph. Returns tuple of error explanation (if any) and normamized graph." [{:keys [context graph] :or {context default-context} :as gx-map}] - (let [config-issues (gx.schema/validate-context context) + (let [config-issues (validate-context context) gx-map (assoc gx-map :context context) ;; remove previous normalization errors gx-map' (cond-> gx-map @@ -328,36 +346,43 @@ (map (fn [[k node]] (let [deps (-> node signal-key - :gx/deps - (concat (or (:gx/after node) [])))] + :gx/deps)] [k (into #{} deps)]))) (into {}))) (defn topo-sort "Sorts graph nodes according to signal topology, returns vector of [error, sorted nodes]" - [{:keys [context graph]} signal-key] - (binding [*err-ctx* (->err-ctx {:error-type :deps-sort - :signal-key signal-key})] - (try - (if-let [signal-config (get-in context [:signals signal-key])] - (let [deps-from (or (:deps-from signal-config) - signal-key) - graph-deps (graph-dependencies graph deps-from) - sorted-raw (impl/sccs graph-deps)] - (when-let [errors (->> sorted-raw - (impl/dependency-errors graph-deps) - (map impl/human-render-dependency-error) - (seq))] - (throw-gx-error "Dependency errors" {:errors errors})) - [nil - (let [topo-sorted (map first sorted-raw)] - (if (= :topological (:order signal-config)) - topo-sorted - (reverse topo-sorted)))]) - (throw-gx-error (str "Unknown signal key '" signal-key "'"))) - (catch ExceptionInfo e - [(assoc (ex-data e) :message (ex-message e))])))) + ([gx-map signal-key] + (topo-sort gx-map signal-key #{})) + ([{:keys [context graph]} signal-key priority-selector] + (binding [*err-ctx* (->err-ctx {:error-type :deps-sort + :signal-key signal-key})] + (try + (if-let [signal-config (get-in context [:signals signal-key])] + (let [deps-from (or (:deps-from signal-config) + signal-key) + graph-deps (->> deps-from + (graph-dependencies graph) + (map (fn [[k deps :as signal-deps]] + (if (contains? priority-selector k) + signal-deps + [k (into deps priority-selector)]))) + (into {})) + sorted-raw (impl/sccs graph-deps)] + (when-let [errors (->> sorted-raw + (impl/dependency-errors graph-deps) + (map impl/human-render-dependency-error) + (seq))] + (throw-gx-error "Dependency errors" {:errors errors})) + [nil + (let [topo-sorted (map first sorted-raw)] + (if (:deps-from signal-config) + (reverse topo-sorted) + topo-sorted))]) + (throw-gx-error (str "Unknown signal key '" signal-key "'"))) + (catch ExceptionInfo e + [(assoc (ex-data e) :message (ex-message e))]))))) (defn get-component-props [graph property-key] @@ -472,25 +497,28 @@ (update gx-map :failures conj failure) gx-map)) -(defn signal [gx-map signal-key] - (let [gx-map (normalize (dissoc gx-map :failures)) - [error sorted] (topo-sort gx-map signal-key) - gx-map (if error - (update gx-map :failures conj error) - gx-map)] - (if (seq (:failures gx-map)) - (p/resolved gx-map) - (p/loop [gxm gx-map - sorted sorted] - (cond - (seq sorted) - (p/let [node-key (first sorted) - node (binding [*err-ctx* (->err-ctx - {:error-type :node-signal - :signal-key signal-key - :node-key node-key})] - (node-signal gxm node-key signal-key)) - next-gxm (assoc-in gxm [:graph node-key] node)] - (p/recur (merge-node-failure next-gxm node) (rest sorted))) - - :else gxm))))) +(defn signal + ([gx-map signal-key] + (signal gx-map signal-key #{})) + ([gx-map signal-key priority-selector] + (let [gx-map (normalize (dissoc gx-map :failures)) + [error sorted] (topo-sort gx-map signal-key priority-selector) + gx-map (if error + (update gx-map :failures conj error) + gx-map)] + (if (seq (:failures gx-map)) + (p/resolved gx-map) + (p/loop [gxm gx-map + sorted sorted] + (cond + (seq sorted) + (p/let [node-key (first sorted) + node (binding [*err-ctx* (->err-ctx + {:error-type :node-signal + :signal-key signal-key + :node-key node-key})] + (node-signal gxm node-key signal-key)) + next-gxm (assoc-in gxm [:graph node-key] node)] + (p/recur (merge-node-failure next-gxm node) (rest sorted))) + + :else gxm)))))) diff --git a/src/k16/gx/beta/schema.cljc b/src/k16/gx/beta/schema.cljc index 18295be8..f7ff484a 100644 --- a/src/k16/gx/beta/schema.cljc +++ b/src/k16/gx/beta/schema.cljc @@ -6,7 +6,6 @@ (def ?SignalConfig [:map {:closed true} - [:order [:enum :topological :reverse-topological]] [:deps {:optional true} [:sequential keyword?]] [:from-states [:set keyword?]] [:to-state keyword?] @@ -40,7 +39,6 @@ [:gx/props-schema {:optional true} any?] [:gx/resolved-props-fn {:optional true} [:maybe fn?]] [:gx/deps {:optional true} coll?] - [:gx/after {:optional true} set?] [:gx/resolved-props {:optional true} [:maybe any?]]]) (def ?NormalizedNodeDefinition diff --git a/test/k16/gx/beta/core_test.cljc b/test/k16/gx/beta/core_test.cljc index e27b71e6..ae2473df 100644 --- a/test/k16/gx/beta/core_test.cljc +++ b/test/k16/gx/beta/core_test.cljc @@ -105,11 +105,9 @@ :normalize {:auto-signal :custom/start :props-signals #{:custom/start}} :signals - {:custom/start {:order :topological - :from-states #{:stopped :uninitialized} + {:custom/start {:from-states #{:stopped :uninitialized} :to-state :started} - :custom/stop {:order :reverse-topological - :from-states #{:started} + :custom/stop {:from-states #{:started} :to-state :stopped :deps-from :gx/start}}} graph {:a {:nested-a 1} @@ -444,23 +442,51 @@ first (update :internal-data dissoc :component-schema))))))) -(def server-component - {:gx/start {:gx/processor (fn [_] :http-server)} - :gx/stop {:gx/processor (fn [_] nil)}}) +(def ^:export server-component + {:gx/start {:gx/processor (fn [{:keys [props]}] + (swap! (:flow props) conj :server))} + :gx/stop {:gx/processor (fn [{:keys [props]}] + (swap! (:flow props) conj :server) + nil)}}) + +(def ^:export db-component + {:gx/start {:gx/processor (fn [{:keys [props]}] + (swap! (:flow props) conj :db))} + :gx/stop {:gx/processor (fn [{:keys [props]}] + (swap! (:flow props) conj :db) + nil)}}) -(def logger-component - {:gx/start {:gx/processor (fn [_] :logger)} - :gx/stop {:gx/processor (fn [_] nil)}}) +(def ^:export logger-component + {:gx/start {:gx/processor (fn [{:keys [props]}] + (swap! (:flow props) conj :logger))} + :gx/stop {:gx/processor (fn [{:keys [props]}] + (swap! (:flow props) conj :logger) + nil)}}) -(deftest signal-flow-dependency-test - (let [graph {:logger {:gx/component 'k16.gx.beta.core-test/logger-component} +(deftest signal-selector-test + (let [flow (atom []) + graph {:logger {:gx/component 'k16.gx.beta.core-test/logger-component + :gx/props {:flow flow}} :options {:port 8080} - :other {:gx/after #{:server}} + :db {:gx/component 'k16.gx.beta.core-test/db-component + :gx/props {:flow flow}} :server {:gx/component 'k16.gx.beta.core-test/server-component - :gx/props '(gx/ref :options) - :gx/after #{:logger}}} - norm (gx/normalize {:graph graph})] - (is (= '(:server :other :options :logger) - (second (gx/topo-sort norm :gx/stop)))) - (is (= '(:logger :options :other :server) - (second (gx/topo-sort norm :gx/start)))))) + :gx/props {:opts '(gx/ref :options) + :flow flow}}} + norm (gx/normalize {:graph graph}) + gx-started (gx/signal norm :gx/start #{:logger})] + #?@(:clj [@gx-started + (is (= [:logger :db :server] @flow)) + (reset! flow []) + @(gx/signal @gx-started :gx/stop #{:logger}) + (is (= [:server :db :logger] @flow))] + :cljs [(t/async + done + (p/then gx-started + (fn [s] + (is (= [:logger :db :server] @flow)) + (reset! flow []) + (p/then (gx/signal s :gx/stop #{:logger}) + (fn [_] + (is (= [:server :db :logger] @flow)) + (done))))))])))