Skip to content

Commit

Permalink
Page observation pipeline subject by graph
Browse files Browse the repository at this point in the history
The subject-cache for the observation pipeline has become too big.

We need to handle 30m subjects and more.

Downloading these with limit/offset was impossible as we can't sort/ retreive later pages quickly enough.

Even a single download exceeded the 15 minute stardog timeout and caused drafter to hang in GC (presumably because stasher would hold the whole 3G+ result in memory).

This patch introduces an optional `graph-query` parameter that's applied on the `observation-pipeline`. We use this to page the subject-query by graph (then into page-size partitions).

Hopefully this fixes #103.
  • Loading branch information
Robsteranium committed May 19, 2021
1 parent 57dc5a9 commit 150c73a
Show file tree
Hide file tree
Showing 8 changed files with 96,725 additions and 90,675 deletions.
3,432 changes: 1,716 additions & 1,716 deletions cassettes/code-pipeline.edn

Large diffs are not rendered by default.

704 changes: 352 additions & 352 deletions cassettes/component-pipeline.edn

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions cassettes/dataset-pipeline.edn
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{"Alt-Svc" "clear",
"Connection" "close",
"Content-Type" "text/csv",
"Date" "Wed, 12 May 2021 15:53:18 GMT",
"Date" "Wed, 19 May 2021 11:50:39 GMT",
"Server" "nginx",
"Strict-Transport-Security" "max-age=15768000",
"Transfer-Encoding" "chunked",
Expand Down Expand Up @@ -138,7 +138,7 @@
{"Alt-Svc" "clear",
"Connection" "close",
"Content-Type" "application/n-triples",
"Date" "Wed, 12 May 2021 15:53:19 GMT",
"Date" "Wed, 19 May 2021 11:50:39 GMT",
"Server" "nginx",
"Strict-Transport-Security" "max-age=15768000",
"Transfer-Encoding" "chunked",
Expand All @@ -153,4 +153,4 @@
:streaming? true,
:trace-redirects []},
:var-name "clj-http.core/request"}],
:recorded-at #inst "2021-05-12T15:53:18.706-00:00"}
:recorded-at #inst "2021-05-19T11:50:39.410-00:00"}
4 changes: 2 additions & 2 deletions cassettes/extract-datasets.edn
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
{"Alt-Svc" "clear",
"Connection" "close",
"Content-Type" "application/n-triples",
"Date" "Wed, 12 May 2021 15:52:25 GMT",
"Date" "Wed, 19 May 2021 11:50:40 GMT",
"Server" "nginx",
"Strict-Transport-Security" "max-age=15768000",
"Transfer-Encoding" "chunked",
Expand All @@ -190,4 +190,4 @@
:streaming? true,
:trace-redirects []},
:var-name "clj-http.core/request"}],
:recorded-at #inst "2021-05-12T15:52:25.647-00:00"}
:recorded-at #inst "2021-05-19T11:50:40.418-00:00"}
183,134 changes: 94,568 additions & 88,566 deletions cassettes/fixtures.edn

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions resources/etl/observation-graph.sparql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT ?graph WHERE {
?dataset <http://publishmydata.com/pmdcat#graph> ?graph .
}
6 changes: 3 additions & 3 deletions resources/etl/observation-select.sparql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PREFIX qb: <http://purl.org/linked-data/cube#>
PREFIX pmdcat: <http://publishmydata.com/pmdcat#>

SELECT ?observation WHERE {
?dataset pmdcat:datasetContents ?cube .
?observation qb:dataSet ?cube .
GRAPH ?graph {
?observation a qb:Observation .
}
}
111 changes: 78 additions & 33 deletions src/ook/etl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
(com.github.jsonldjava.utils JsonUtils)
(com.github.jsonldjava.core JsonLdOptions JsonLdProcessor)))


;; Pipeline debugging

(defn write-to-disk
Expand Down Expand Up @@ -71,49 +72,82 @@
clause (str "\n VALUES ?" var-name " { " terms " }\n")]
(str top clause bottom)))

(defn spill-to-disk
"Writes data to file"
[data file]
(with-open [os (io/output-stream file)
is (io/input-stream data)]
(io/copy is os)))

(defn read-paged
"Reads lines out of file in lazy-seq of pages.
Pages are vectors - the variable name followed by the values."
[file page-size]
(let [rdr (io/reader file)
var-name (.readLine rdr)
read-lines (fn this [r]
(lazy-seq
(if-let [line (.readLine r)]
(cons line (this r))
(.close r))))]
(->> (read-lines rdr)
(partition-all page-size)
(map (fn [page] (cons var-name page))))))

(defn select-paged
"Executes a select query returning results in a lazy seq of pages.
Query results are spilled to disk and read out one page at a time.
Pages are vectors - the variable name followed by the values."
([client query-string]
(select-paged client query-string 50000))
([client query-string page-size]
(let [cache-file (File/createTempFile "ook-etl-subject-cache-" ".tmp")]
Pages are vectors - the variable name followed by the values.
Query results are spilled to disk and read out in page-size partitions.
If a graph query is provided the subject-query is first paged by graph,
one-at-a-time."
([client subject-query page-size]
(let [subject-cache (File/createTempFile "ook-etl-subject-cache-" ".tmp")]
(try
;; write results to tempfile cache
(let [client (interceptors/accept client "text/csv")]
(with-open [cache (io/output-stream cache-file)
result (io/input-stream (query client query-string))]
(io/copy result cache)))
;; read results from tempfile cache
(let [rdr (io/reader cache-file)
var-name (.readLine rdr)
read-lines (fn this [r]
(lazy-seq
(if-let [line (.readLine r)]
(cons line (this r))
(.close r))))]
;; paginate
(->> (read-lines rdr)
(partition-all page-size)
(map (fn [page]
(cons var-name page)))))
(finally (.delete cache-file))))))
(spill-to-disk (query client subject-query) subject-cache))
(read-paged subject-cache page-size)
(finally (.delete subject-cache)))))
([client graph-query subject-query page-size]
(let [graph-cache (File/createTempFile "ook-etl-graph-cache-" ".tmp")]
(try
(let [client (interceptors/accept client "text/csv")]
(spill-to-disk (query client graph-query) graph-cache))
(mapcat (fn [[var-name values]]
(let [subject-query (insert-values-clause subject-query var-name [values])]
(select-paged client subject-query page-size)))
(read-paged graph-cache 1)) ;; read multi-graph datasets one graph
(finally (.delete graph-cache))))))



;; Extract
(defn with-dataset-scope [query datasets]
"Scopes query to target datasets if present"
(if datasets
(insert-values-clause query "dataset" datasets)
query))

(defn subject-pages
"Executes a query to get a collection of URIs. Returns pages of URIs for
inserting into another query. Each page is a vector beginning with the var name
followed by the URIs. NB: Only expecting a single variable to be bound in the results.
See `insert-values-clause`."
[{:keys [drafter-client/client ook.etl/target-datasets ook.etl/select-page-size] :as system} subject-query]
(let [subject-query (if target-datasets
(insert-values-clause subject-query "dataset" target-datasets)
subject-query)]
(select-paged client subject-query (or select-page-size 50000))))
followed by the URIs."
([{:keys [drafter-client/client
ook.etl/target-datasets
ook.etl/select-page-size]
:as system
:or {select-page-size 50000}} subject-query]
(select-paged client
(with-dataset-scope subject-query target-datasets)
select-page-size))
([{:keys [drafter-client/client
ook.etl/target-datasets
ook.etl/select-page-size]
:as system
:or {select-page-size 50000}} graph-query subject-query]
(select-paged client
(with-dataset-scope graph-query target-datasets)
(with-dataset-scope subject-query target-datasets)
select-page-size)))

(defn extract
"Executes the construct query binding in values from page"
Expand Down Expand Up @@ -194,11 +228,11 @@

;; Pipeline

(defn pipeline-fn [page-query construct-query jsonld-frame index]
(defn pipeline-fn* [pager-fn construct-query jsonld-frame index]
(fn [system]
(log/info (str "Pipeline Started: " index))
(let [counter (atom 0)]
(doseq [[var-name & uris] (subject-pages system page-query)]
(doseq [[var-name & uris] (pager-fn system)]
(log/info "Processing page starting with" index "subject" @counter)
(if uris
(do
Expand All @@ -211,6 +245,16 @@
(log/warn (str "No compatible (" index ") subjects found!")))))
(log/info (str "Pipeline Complete: " index))))

(defn pipeline-fn
([subject-query construct-query jsonld-frame index]
(pipeline-fn*
(fn [system] (subject-pages system subject-query))
construct-query jsonld-frame index))
([graph-query subject-query construct-query jsonld-frame index]
(pipeline-fn*
(fn [system] (subject-pages system graph-query subject-query))
construct-query jsonld-frame index)))

(def dataset-pipeline
(pipeline-fn
(slurp (io/resource "etl/dataset-select.sparql"))
Expand Down Expand Up @@ -241,6 +285,7 @@

(def observation-pipeline
(pipeline-fn
(slurp (io/resource "etl/observation-graph.sparql"))
(slurp (io/resource "etl/observation-select.sparql"))
(slurp (io/resource "etl/observation-construct.sparql"))
(slurp (io/resource "etl/observation-frame.json"))
Expand Down

0 comments on commit 150c73a

Please sign in to comment.