From 787920da7d2748413cca252cf6286ea2a2492c25 Mon Sep 17 00:00:00 2001
From: Kai Waldrant <kai@data-intuitive.com>
Date: Thu, 11 Jul 2024 20:31:55 +0200
Subject: [PATCH] Add process datasets workflow script

---
 src/workflows/process_datasets/main.nf | 55 ++++++++++++++++++++++++++
 1 file changed, 55 insertions(+)
 create mode 100644 src/workflows/process_datasets/main.nf

diff --git a/src/workflows/process_datasets/main.nf b/src/workflows/process_datasets/main.nf
new file mode 100644
index 0000000..88cf249
--- /dev/null
+++ b/src/workflows/process_datasets/main.nf
@@ -0,0 +1,55 @@
+include { findArgumentSchema } from "${meta.resources_dir}/helper.nf"
+
+workflow auto {
+  findStates(params, meta.config)
+    | meta.workflow.run(
+      auto: [publish: "state"]
+    )
+}
+
+workflow run_wf {
+  take:
+  input_ch
+
+  main:
+  output_ch = input_ch
+
+    | check_dataset_schema.run(
+      fromState: { id, state ->
+        def schema = findArgumentSchema(meta.config, "input")
+        def schemaYaml = tempFile("schema.yaml")
+        writeYaml(schema, schemaYaml)
+        [
+          "input": state.input,
+          "schema": schemaYaml
+        ]
+      },
+      toState: { id, output, state ->
+        // read the output to see if dataset passed the qc
+        def checks = readYaml(output.output)
+        state + [
+          "dataset": checks["exit_code"] == 0 ? state.input : null,
+        ]
+      }
+    )
+
+    // remove datasets which didn't pass the schema check
+    | filter { id, state ->
+      state.dataset != null
+    }
+
+    | process_dataset.run(
+      fromState: [ input: "dataset" ],
+      toState: [
+        output_train: "output_train",
+        output_test: "output_test",
+        output_solution: "output_solution"
+      ]
+    )
+
+    // only output the files for which an output file was specified
+    | setState(["output_train", "output_test", "output_solution"])
+
+  emit:
+  output_ch
+}