-
Notifications
You must be signed in to change notification settings - Fork 0
/
ParallelSLURM_processingtemplate.R
138 lines (117 loc) · 6.26 KB
/
ParallelSLURM_processingtemplate.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
##---- REQUIRED INPUTS ----##
PROJECT_NAME = "ExampleLinkage" # string with a project name
rasterdir = "/pc/nhair0a/Built_Environment/BE_Data/Geographic_Data/PRISM_daily/PRISM_data/an" # string with a file path to raster covariates to extract- function will try to pull variable names from sub directories i.e /PRISM/ppt or /PRISM/tmean or /NDVI/30m
extractionlayer = "/d/tmp/nhairs/nhair0a/BellaviaLinkage/sites_10M/sites_10M.shp" # string with path to spatial layer to use for extraction. Can be a CSV or SHP or GDB
layername = "sites_10M" # Layer name used when extraction layer is an SHP or GDB, ignored when extraction layer is a CSV
IDfield = "ORIG_FID" # Field in extraction layer specifying IDs for features, can be unique or not, used to chunk up batch jobs
Xfield = "X" # A Field containing the X coordinate (Longitude), in decimal degrees, only for CSV
Yfield = "Y" # A Field containing the Y coordinate (Longitude), in decimal degrees, only for CSV
startdatefield = "start_date" # Field in extraction layer specifying first date of observations
enddatefield = "end_date" # Field in extraction layer specifying last date of observations
predays = 0 # Integer specifying how many days preceding 'startingdatefield' to extract data. i.e. 365 will mean data extraction will begin 1 year before startdatefield
weights = NA # string specifying file path to raster weights, should only be used when extraction layer is a polygon layer
period = "daily" #specify a period to summarize values: daily, monthly, yearly, defaults to daily
email = "" #Enter your email. SLURM will send you an email when your jobs are complete.
##---- Required Packages
listOfPackages <- c("batchtools","terra","tools","reshape2","ids")
for (i in listOfPackages){
if(! i %in% installed.packages()){
install.packages(i, dependencies = TRUE)
}
require(i,character.only=TRUE)
}
##REQUIRED##
##---- Initialize batchtools configuration files and template
if(!file.exists("slurm.tmpl")){
download.file("https://raw.githubusercontent.com/WillhKessler/GCMC_RScripts/main/slurm.tmpl","slurm.tmpl")
}else{
print("template exists")
}
if(!file.exists("batchtools.conf.R")){
download.file("https://raw.githubusercontent.com/WillhKessler/GCMC_RScripts/main/batchtools.conf.R", "batchtools.conf.R")
}else{
print("conf file exists")
}
#Create a temporary registry item
if(file.exists(paste(PROJECT_NAME,"Registry",sep="_"))){
reg = loadRegistry(paste(PROJECT_NAME,"Registry",sep="_"),writeable = TRUE,conf.file="batchtools.conf.R")
}else{
reg = makeRegistry(file.dir = paste(PROJECT_NAME,"Registry",sep="_"), seed = 42,conf.file="batchtools.conf.R")
}
##########Input PROCESSING HERE####################################################
## Call Desired functions from Functions_RasterExtraction source file
## The desired functions are mapped in creating the jobs via batchMap
source("https://raw.githubusercontent.com/WillhKessler/GCMC_RScripts/refs/heads/main/Functions_RasterExtraction.R")
##############################################################
##---- Set up the batch processing jobs
##---- Use the 'batchgrid' function to create a grid of variable combinations to process over. function considers input rasters, input features, and any weighting layers
batchgrid = function(rasterdir,period,extractionlayer,layername,IDfield,Xfield,Yfield,startdatefield,enddatefield,predays,weightslayers){
require("tools")
##---- Set up the batch processing jobs
pvars = list.dirs(path = rasterdir,full.names = FALSE,recursive = FALSE)
if(file_ext(extractionlayer)=="csv"){
feature<-read.csv(extractionlayer,stringsAsFactors = FALSE)
feature$OID<-1:nrow(feature)
write.csv(x = feature,file = paste0(file_path_sans_ext(extractionlayer),"_tmp",".csv"),row.names = FALSE)
feature<-feature$OID
layername = NA
weightslayers = NA
extractionlayer<-paste0(file_path_sans_ext(extractionlayer),"_tmp",".csv")
IDfield="OID"
}else if(file_ext(extractionlayer) %in% c("shp","gdb")){
require('terra')
vectorfile<- vect(x=extractionlayer,layer=layername)
vectorfile$OID<-1:nrow(vectorfile)
writeVector(x = vectorfile,filename = paste0(file_path_sans_ext(extractionlayer),"_tmp.",file_ext(extractionlayer)),layer=layername,overwrite=TRUE)
feature<- unlist(unique(values(vectorfile[,"OID"])))
Xfield = NA
Yfield = NA
extractionlayer<-paste0(file_path_sans_ext(extractionlayer),"_tmp.",file_ext(extractionlayer))
IDfield="OID"
if (file_ext(extractionlayer)=="shp"){
layername<-paste0(extractionlayer,"_tmp")
}
}
output<- expand.grid(vars = pvars,
period = period,
piece = feature,
rasterdir = rasterdir,
extractionlayer = extractionlayer,
layername = layername,
IDfield = IDfield,
Xfield = Xfield,
Yfield = Yfield,
startdatefield = startdatefield,
enddatefield = enddatefield,
predays = predays,
weightslayers = weightslayers,
stringsAsFactors = FALSE)
return(output)
}
##---- Make sure registry is empty
clearRegistry(reg)
##---- create jobs from variable grid
jobs<- batchMap(fun = extract.rast,
batchgrid(rasterdir = rasterdir,
extractionlayer = extractionlayer,
layername = layername,
IDfield = IDfield,
Xfield = Xfield,
Yfield = Yfield,
startdatefield = startdatefield,
enddatefield = enddatefield,
predays = predays,
weightslayers = weights),
reg = reg)
jobs$chunk<-chunk(jobs$job.id,n.chunks=100)
setJobNames(jobs,paste(abbreviate(PROJECT_NAME),jobs$job.id,sep=""),reg=reg)
getJobTable()
getStatus()
##---- Submit jobs to scheduler
done <- batchtools::submitJobs(jobs,
reg=reg,
resources=list(partition="linux01", walltime=3600000, ntasks=1, ncpus=1, memory=5000,email=email))
#Sys.sleep(1000)
#estimateRuntimes(jobs,reg=reg)
getStatus()
waitForJobs() # Wait until jobs are completed