Skip to content

Commit

Permalink
feat: selectors + elegant way to selector topology
Browse files Browse the repository at this point in the history
  • Loading branch information
armed committed Jun 9, 2022
1 parent 0293b0a commit 42792b1
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 75 deletions.
134 changes: 81 additions & 53 deletions src/k16/gx/beta/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@
{:initial-state :uninitialised
:normalize {;; signal, whish is default for static component nodes
:auto-signal :gx/start
:props-signals #{:gx/start :gx/stop}}
:props-signals #{:gx/start}}
:signal-mapping {}
:signals {:gx/start {:order :topological
:from-states #{:stopped :uninitialised}
:signals {:gx/start {:from-states #{:stopped :uninitialised}
:to-state :started}
:gx/stop {:order :reverse-topological
:from-states #{:started}
:gx/stop {:from-states #{:started}
:to-state :stopped
:deps-from :gx/start}}})

Expand Down Expand Up @@ -147,7 +145,7 @@
def? (and (map? signal-def)
(some #{:gx/props :gx/props-fn
:gx/processor :gx/deps
:gx/resolved-props :gx/after}
:gx/resolved-props}
(keys signal-def)))
with-pushed-down-form
(if def?
Expand Down Expand Up @@ -301,12 +299,32 @@
{:gx/type (if def? :component :static)
:gx/normalized? true})))))

(defn signal-dependencies
[{:keys [signals]}]
(->> signals
(map (fn [[k v]]
[k (if-let [f (:deps-from v)]
#{f}
#{})]))
(into {})))

(defn validate-context
"Validates context against schema and checks signal dependency errors"
[context]
(or (gx.schema/validate-context context)
(let [deps (signal-dependencies default-context)]
(->> deps
(impl/sccs)
(impl/dependency-errors deps)
(map impl/human-render-dependency-error)
(seq)))))

(defn normalize
"Given a graph definition and config, return a normalised form. Idempotent.
This acts as the static analysis step of the graph.
Returns tuple of error explanation (if any) and normamized graph."
[{:keys [context graph] :or {context default-context} :as gx-map}]
(let [config-issues (gx.schema/validate-context context)
(let [config-issues (validate-context context)
gx-map (assoc gx-map :context context)
;; remove previous normalization errors
gx-map' (cond-> gx-map
Expand All @@ -328,36 +346,43 @@
(map (fn [[k node]]
(let [deps (-> node
signal-key
:gx/deps
(concat (or (:gx/after node) [])))]
:gx/deps)]
[k (into #{} deps)])))
(into {})))

(defn topo-sort
"Sorts graph nodes according to signal topology, returns vector of
[error, sorted nodes]"
[{:keys [context graph]} signal-key]
(binding [*err-ctx* (->err-ctx {:error-type :deps-sort
:signal-key signal-key})]
(try
(if-let [signal-config (get-in context [:signals signal-key])]
(let [deps-from (or (:deps-from signal-config)
signal-key)
graph-deps (graph-dependencies graph deps-from)
sorted-raw (impl/sccs graph-deps)]
(when-let [errors (->> sorted-raw
(impl/dependency-errors graph-deps)
(map impl/human-render-dependency-error)
(seq))]
(throw-gx-error "Dependency errors" {:errors errors}))
[nil
(let [topo-sorted (map first sorted-raw)]
(if (= :topological (:order signal-config))
topo-sorted
(reverse topo-sorted)))])
(throw-gx-error (str "Unknown signal key '" signal-key "'")))
(catch ExceptionInfo e
[(assoc (ex-data e) :message (ex-message e))]))))
([gx-map signal-key]
(topo-sort gx-map signal-key #{}))
([{:keys [context graph]} signal-key priority-selector]
(binding [*err-ctx* (->err-ctx {:error-type :deps-sort
:signal-key signal-key})]
(try
(if-let [signal-config (get-in context [:signals signal-key])]
(let [deps-from (or (:deps-from signal-config)
signal-key)
graph-deps (->> deps-from
(graph-dependencies graph)
(map (fn [[k deps :as signal-deps]]
(if (contains? priority-selector k)
signal-deps
[k (into deps priority-selector)])))
(into {}))
sorted-raw (impl/sccs graph-deps)]
(when-let [errors (->> sorted-raw
(impl/dependency-errors graph-deps)
(map impl/human-render-dependency-error)
(seq))]
(throw-gx-error "Dependency errors" {:errors errors}))
[nil
(let [topo-sorted (map first sorted-raw)]
(if (:deps-from signal-config)
(reverse topo-sorted)
topo-sorted))])
(throw-gx-error (str "Unknown signal key '" signal-key "'")))
(catch ExceptionInfo e
[(assoc (ex-data e) :message (ex-message e))])))))

(defn get-component-props
[graph property-key]
Expand Down Expand Up @@ -472,25 +497,28 @@
(update gx-map :failures conj failure)
gx-map))

(defn signal [gx-map signal-key]
(let [gx-map (normalize (dissoc gx-map :failures))
[error sorted] (topo-sort gx-map signal-key)
gx-map (if error
(update gx-map :failures conj error)
gx-map)]
(if (seq (:failures gx-map))
(p/resolved gx-map)
(p/loop [gxm gx-map
sorted sorted]
(cond
(seq sorted)
(p/let [node-key (first sorted)
node (binding [*err-ctx* (->err-ctx
{:error-type :node-signal
:signal-key signal-key
:node-key node-key})]
(node-signal gxm node-key signal-key))
next-gxm (assoc-in gxm [:graph node-key] node)]
(p/recur (merge-node-failure next-gxm node) (rest sorted)))

:else gxm)))))
(defn signal
([gx-map signal-key]
(signal gx-map signal-key #{}))
([gx-map signal-key priority-selector]
(let [gx-map (normalize (dissoc gx-map :failures))
[error sorted] (topo-sort gx-map signal-key priority-selector)
gx-map (if error
(update gx-map :failures conj error)
gx-map)]
(if (seq (:failures gx-map))
(p/resolved gx-map)
(p/loop [gxm gx-map
sorted sorted]
(cond
(seq sorted)
(p/let [node-key (first sorted)
node (binding [*err-ctx* (->err-ctx
{:error-type :node-signal
:signal-key signal-key
:node-key node-key})]
(node-signal gxm node-key signal-key))
next-gxm (assoc-in gxm [:graph node-key] node)]
(p/recur (merge-node-failure next-gxm node) (rest sorted)))

:else gxm))))))
2 changes: 0 additions & 2 deletions src/k16/gx/beta/schema.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

(def ?SignalConfig
[:map {:closed true}
[:order [:enum :topological :reverse-topological]]
[:deps {:optional true} [:sequential keyword?]]
[:from-states [:set keyword?]]
[:to-state keyword?]
Expand Down Expand Up @@ -40,7 +39,6 @@
[:gx/props-schema {:optional true} any?]
[:gx/resolved-props-fn {:optional true} [:maybe fn?]]
[:gx/deps {:optional true} coll?]
[:gx/after {:optional true} set?]
[:gx/resolved-props {:optional true} [:maybe any?]]])

(def ?NormalizedNodeDefinition
Expand Down
66 changes: 46 additions & 20 deletions test/k16/gx/beta/core_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,9 @@
:normalize {:auto-signal :custom/start
:props-signals #{:custom/start}}
:signals
{:custom/start {:order :topological
:from-states #{:stopped :uninitialized}
{:custom/start {:from-states #{:stopped :uninitialized}
:to-state :started}
:custom/stop {:order :reverse-topological
:from-states #{:started}
:custom/stop {:from-states #{:started}
:to-state :stopped
:deps-from :gx/start}}}
graph {:a {:nested-a 1}
Expand Down Expand Up @@ -444,23 +442,51 @@
first
(update :internal-data dissoc :component-schema)))))))

(def server-component
{:gx/start {:gx/processor (fn [_] :http-server)}
:gx/stop {:gx/processor (fn [_] nil)}})
(def ^:export server-component
{:gx/start {:gx/processor (fn [{:keys [props]}]
(swap! (:flow props) conj :server))}
:gx/stop {:gx/processor (fn [{:keys [props]}]
(swap! (:flow props) conj :server)
nil)}})

(def ^:export db-component
{:gx/start {:gx/processor (fn [{:keys [props]}]
(swap! (:flow props) conj :db))}
:gx/stop {:gx/processor (fn [{:keys [props]}]
(swap! (:flow props) conj :db)
nil)}})

(def logger-component
{:gx/start {:gx/processor (fn [_] :logger)}
:gx/stop {:gx/processor (fn [_] nil)}})
(def ^:export logger-component
{:gx/start {:gx/processor (fn [{:keys [props]}]
(swap! (:flow props) conj :logger))}
:gx/stop {:gx/processor (fn [{:keys [props]}]
(swap! (:flow props) conj :logger)
nil)}})

(deftest signal-flow-dependency-test
(let [graph {:logger {:gx/component 'k16.gx.beta.core-test/logger-component}
(deftest signal-selector-test
(let [flow (atom [])
graph {:logger {:gx/component 'k16.gx.beta.core-test/logger-component
:gx/props {:flow flow}}
:options {:port 8080}
:other {:gx/after #{:server}}
:db {:gx/component 'k16.gx.beta.core-test/db-component
:gx/props {:flow flow}}
:server {:gx/component 'k16.gx.beta.core-test/server-component
:gx/props '(gx/ref :options)
:gx/after #{:logger}}}
norm (gx/normalize {:graph graph})]
(is (= '(:server :other :options :logger)
(second (gx/topo-sort norm :gx/stop))))
(is (= '(:logger :options :other :server)
(second (gx/topo-sort norm :gx/start))))))
:gx/props {:opts '(gx/ref :options)
:flow flow}}}
norm (gx/normalize {:graph graph})
gx-started (gx/signal norm :gx/start #{:logger})]
#?@(:clj [@gx-started
(is (= [:logger :db :server] @flow))
(reset! flow [])
@(gx/signal @gx-started :gx/stop #{:logger})
(is (= [:server :db :logger] @flow))]
:cljs [(t/async
done
(p/then gx-started
(fn [s]
(is (= [:logger :db :server] @flow))
(reset! flow [])
(p/then (gx/signal s :gx/stop #{:logger})
(fn [_]
(is (= [:server :db :logger] @flow))
(done))))))])))

1 comment on commit 42792b1

@vercel
Copy link

@vercel vercel bot commented on 42792b1 Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

gx – ./

gx-kepler16.vercel.app
gx.kepler16.com
gx-git-master-kepler16.vercel.app

Please sign in to comment.