-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGeneMania.scala
122 lines (104 loc) · 5.58 KB
/
GeneMania.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package galliaexample.genemania
import scala.util.chaining._
import aptus._
import gallia._
// ===========================================================================
class GeneMania(
inputDirPath: String, inputCompression: String, maxFiles: Option[Int] = None /* None is all */,
outputPath : String) {
import SparkDagTooBigWorkaround.Implicits
// ===========================================================================
// just a trick to simplify reusing code in the spark-version, could all be inlined for non-spark run
var weightInputReader : String => HeadS = path => path.stream(_.tsv.iteratorMode.schema("Gene_A".string, "Gene_B".string, "Weight".double))
var networkInputReader: String => HeadS = path => path.stream(_.tsv .schema("File_Name".string, "Network_Group_Name".string, "Network_Name".string, "Source".string, "Pubmed_ID".string))
var outputWriter: String => HeadS => Unit = path => x => { x.write(path); () }
var checkpointingHook: HeadS => HeadS = identity // don't checkpoint for non-spark run
var coalescingHook : HeadS => HeadS = identity // don't coalesce for non-spark run
// ===========================================================================
def apply() = {
union()
.logProgress(/* every */ 100000 /* row */, "incoming")
.pipe(restructure)
.logProgress(/* every */ 100 /* genes */, "outgoing")
.pipe(coalescingHook)
.pipe(outputWriter(outputPath))
}
// ===========================================================================
def union(): HeadS =
fileNames()
.mapWithCheckpointingGroups(maxFiles)(checkpointingHook) { fileName => // vs a simple ".map { fileName =>" if not using spark
weights(fileName)
.union {
// confirmed all interactions are symetrical (see https://groups.google.com/g/genemania-discuss/c/Go4oXNHEhoQ)
weights(fileName).swapEntries("Gene_A", "Gene_B") } }
.reduceLeft(_ union _)
// ---------------------------------------------------------------------------
def fileNames(): Seq[FileName] =
networks()
.forceStrings("File_Name")
.filterNot(_ == "Co-expression.Honda-Kaneko-2010.txt") // empty (not even a header)
.pipeOpt(maxFiles)(n => _.take(n))
.sorted
// ---------------------------------------------------------------------------
/*
excerpt:
Gene_A Gene_B Weight
ENSG00000000457 ENSG00000000460 1.2E-2
ENSG00000001629 ENSG00000001631 1.8E-2
ENSG00000000938 ENSG00000002834 3.7E-3
...
*/
def weights(fileName: String): HeadS =
s"${inputDirPath}/${fileName}${inputCompression}"
.pipe(weightInputReader)
.add("File_Name" -> fileName) // will be join key
// ===========================================================================
def restructure(union: HeadS): HeadS =
union
.rename(
"Gene_A" ~> _id, // they all seem to use ensembl, for humans at least
"Gene_B" ~> "target",
"Weight" ~> "weight")
.innerJoin(networks().toViewBased) // will use hash join since right-hand side not distributed (view-based); alse see t210322111234
.remove("File_Name") // not needed after the join (redundant)
.groupBy(_id).as("interactions") // will leverage GNU sort since .iteratorMode (see https://github.com/galliaproject/gallia-core#poor-mans-scaling-spilling)
.transformAllEntities("interactions").using {
_ .nest("network", "source", "pubmed", "weight").under("context")
.group("context").by("interaction", "target")
.transformAllEntities("context").using {
// eg for ENSG00000006451 -> predicted -> ENSG00000116903 (0.71, then 0.12)
_.sortByDescending("weight") }
.nest("target", "context").under(_tmp)
.group(_tmp).by ("interaction")
.pivot(_tmp).column("interaction")
.asNewKeys( // MUST provide until https://github.com/galliaproject/gallia-docs/blob/master/tasks.md#t210110094829 addressed
"predicted" ,
"pathway" ,
"co_localization" ,
"genetic_interactions" ,
"physical_interactions" ,
"shared_protein_domains") }
.unnestAllFrom("interactions")
// ===========================================================================
/*
excerpt:
File_Name Network_Group_Name Network_Name Source Pubmed_ID
Predicted.I2D-BIND-Fly2Human.txt Predicted I2D-BIND-Fly2Human I2D 10871269
Predicted.I2D-BIND-Mouse2Human.txt Predicted I2D-BIND-Mouse2Human I2D 10871269
Predicted.I2D-BIND-Rat2Human.txt Predicted I2D-BIND-Rat2Human I2D 10871269
...
*/
def networks(): HeadS =
s"${inputDirPath}/networks.txt${inputCompression}"
.pipe(networkInputReader)
.rename(
"Network_Group_Name" ~> "interaction",
"Network_Name" ~> "network",
"Source" ~> "source",
"Pubmed_ID" ~> "pubmed")
.convert ("pubmed").toStr // integer-like but not intended as such
.removeIfValueFor("pubmed").is("0")
.transformString("interaction").using { // eg "Co-localization" --> "co_localization"
_.replace(" ", "_").replace("-", "_").toLowerCase }
}
// ===========================================================================