Skip to content

Commit

Permalink
Merge pull request #509 from NEONScience/fix.asgn.same.start.end
Browse files Browse the repository at this point in the history
Fix.asgn.same.start.end
  • Loading branch information
covesturtevant authored Nov 28, 2024
2 parents c579ff5 + 807eeca commit d3565cf
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 18 deletions.
2 changes: 1 addition & 1 deletion flow/flow.loc.grp.asgn/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Dockerfile for NEON IS Data Processing - Calibration assignment

# Start with the base image.
FROM us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pack-base-r:v1.4.7
FROM us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pack-base-r:sha-92ae3cc

ARG FLOW_DIR="./flow"
ARG APP_DIR="flow.loc.grp.asgn"
Expand Down
4 changes: 3 additions & 1 deletion flow/flow.loc.grp.asgn/flow.loc.grp.asgn.R
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
# Add option for group files
# Cove Sturtevant (2023-11-16)
# Add option for retaining only particular properties
# Cove Sturtevant (2024-11-27)
# Allow good records to pass through, while removing bad records and routing to errored datums
##############################################################################################
library(foreach)
library(doParallel)
Expand Down Expand Up @@ -186,7 +188,7 @@ foreach::foreach(idxDirIn = DirIn) %dopar% {
call.stack=call.stack,
DirDatm=idxDirIn,
DirErrBase=Para$DirErr,
RmvDatmOut=TRUE,
RmvDatmOut=FALSE, # Set to FALSE so that good records make it through
DirOutBase=Para$DirOut,
log=log
)
Expand Down
47 changes: 42 additions & 5 deletions flow/flow.loc.grp.asgn/wrap.loc.grp.asgn.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
# incorporate IS default processing start date
# Cove Sturtevant (2022-11-22)
# Add option for group files
# Cove Sturtevant (2024-11-27)
# Allow good records to pass through, while removing bad records and routing to errored datums
##############################################################################################
wrap.loc.grp.asgn <- function(DirIn,
DirOutBase,
Expand All @@ -86,6 +88,8 @@ wrap.loc.grp.asgn <- function(DirIn,
stop()
}

flagErr <- FALSE # Intialize trigger for indicating that at least one failure has occurred

# Directory listing of files for this datum
file <- base::dir(DirIn)
numFile <- base::length(file)
Expand Down Expand Up @@ -216,12 +220,41 @@ wrap.loc.grp.asgn <- function(DirIn,
if (timeEndIdxRow == timeAsgn$timeEnd[idxRow]){
timeEndIdxRow <- timeEndIdxRow-base::as.difftime(1,units='days')
}
base::seq.POSIXt(from=timeBgnIdxRow,
to=timeEndIdxRow,
by='day')

# Allow good records to pass through, while removing bad records and routing to errored datums
tryCatch(
rpt <- base::seq.POSIXt(from=timeBgnIdxRow,
to=timeEndIdxRow,
by='day'
),
# If errored, return NULL
error=function(err) {
log$error(base::paste0('Bad time range for SRF ID ',
srf$id[idxRow],
'. Datum ',
DirIn,
' will be routed to errored datums but good SRFs will still be assigned.'
)
)
rpt <- NULL
}
)
}
)

# Mark errors for routing to errored datums
setErr <- base::unlist(base::lapply(ts,base::is.null))
if(base::any(setErr)){
flagErr <- TRUE

# Remove errors
ts <- ts[!setErr]

# Skip if no unerrored time ranges
if(base::length(ts) == 0){
next
}
}

ts <- base::unique(base::do.call(c,ts)) # unlist
base::attr(ts,'tzone') <- base::attr(TimeBgn,'tzone') # Re-assign time zone, which was stripped in unlisting
tsChar <- base::unique(format(ts,format='%Y/%m/%d')) # Format as year/month/day repo structure and get rid of duplicates
Expand Down Expand Up @@ -274,7 +307,11 @@ wrap.loc.grp.asgn <- function(DirIn,
} # End loop around files



# Return error if a failure occurred
if(flagErr == TRUE){
stop()
}

return()

} # End function
2 changes: 1 addition & 1 deletion flow/flow.srf.asgn/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# docker build -t neon-is-srf-asgn-r .

# Start with the base image.
FROM us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pack-pub-r:v1.0.2
FROM us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pack-pub-r:sha-f9cd4ba

ARG FLOW_DIR="./flow"
ARG APP_DIR="flow.srf.asgn"
Expand Down
4 changes: 3 additions & 1 deletion flow/flow.srf.asgn/flow.srf.asgn.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
# changelog and author contributions / copyrights
# Cove Sturtevant (2023-01-26)
# original creation, refactored from flow.loc.grp.asgn
# Cove Sturtevant (2024-11-27)
# Allow good records to pass through, while removing bad records and routing to errored datums
##############################################################################################
library(foreach)
library(doParallel)
Expand Down Expand Up @@ -153,7 +155,7 @@ foreach::foreach(idxDirIn = DirIn) %dopar% {
call.stack=call.stack,
DirDatm=idxDirIn,
DirErrBase=Para$DirErr,
RmvDatmOut=TRUE,
RmvDatmOut=FALSE, # Set to FALSE so that good SRF records make it through
DirOutBase=Para$DirOut,
log=log
)
Expand Down
46 changes: 42 additions & 4 deletions flow/flow.srf.asgn/wrap.srf.asgn.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
# changelog and author contributions / copyrights
# Cove Sturtevant (2023-01-27)
# original creation
# Cove Sturtevant (2024-11-27)
# Allow good records to pass through, while removing bad records and routing to errored datums
##############################################################################################
wrap.srf.asgn <- function(DirIn,
DirOutBase,
Expand All @@ -63,6 +65,8 @@ wrap.srf.asgn <- function(DirIn,
log <- NEONprocIS.base::def.log.init()
}

flagErr <- FALSE # Intialize trigger for indicating that at least one failure has occurred

# Directory listing of files for this datum
file <- base::dir(DirIn)
numFile <- base::length(file)
Expand Down Expand Up @@ -135,12 +139,41 @@ wrap.srf.asgn <- function(DirIn,
if (timeEndIdxRow == timeAsgn$timeEnd[idxRow]){
timeEndIdxRow <- timeEndIdxRow-base::as.difftime(1,units='days')
}
base::seq.POSIXt(from=timeBgnIdxRow,
to=timeEndIdxRow,
by='day')

# Allow good records to pass through, while removing bad records and routing to errored datums
tryCatch(
rpt <- base::seq.POSIXt(from=timeBgnIdxRow,
to=timeEndIdxRow,
by='day'
),
# If errored, return NULL
error=function(err) {
log$error(base::paste0('Bad time range for SRF ID ',
srf$id[idxRow],
'. Datum ',
DirIn,
' will be routed to errored datums but good SRFs will still be assigned.'
)
)
rpt <- NULL
}
)
}
)

# Mark errors for routing to errored datums
setErr <- base::unlist(base::lapply(ts,base::is.null))
if(base::any(setErr)){
flagErr <- TRUE

# Remove errors
ts <- ts[!setErr]

# Skip if no unerrored time ranges
if(base::length(ts) == 0){
next
}
}

ts <- base::unique(base::do.call(c,ts)) # unlist
base::attr(ts,'tzone') <- base::attr(TimeBgn,'tzone') # Re-assign time zone, which was stripped in unlisting
tsChar <- base::unique(format(ts,format='%Y/%m/%d')) # Format as year/month/day repo structure and get rid of duplicates
Expand Down Expand Up @@ -172,6 +205,11 @@ wrap.srf.asgn <- function(DirIn,

} # End loop around files

# Return error if a failure occurred
if(flagErr == TRUE){
stop()
}

return()

} # End function
2 changes: 1 addition & 1 deletion pack/NEONprocIS.base/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: NEONprocIS.base
Title: Basic functions for NEON IS data processing
Version: 1.4.7
Version: 1.4.8
Authors@R:
person(given = "Cove",
family = "Sturtevant",
Expand Down
16 changes: 13 additions & 3 deletions pack/NEONprocIS.base/R/def.dir.in.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,19 @@ def.dir.in <- function(DirBgn,nameDirSub,log=NULL){
# If there are no subdirectories expected in the datum directory, then each terminal directory is a datum
if(base::length(nameDirSub) == 0){
# Find the ones that dont nest in anything else
setMtch <- base::unlist(base::lapply(dirAll,FUN=function(idxDir){
base::sum(base::grepl(pattern=idxDir,x=dirAll,fixed=FALSE))==1
}))
setMtch <- base::unlist(
base::lapply(dirAllSplt,FUN=function(idxDirSplt){
base::sum(
base::unlist(
base::lapply(dirAllSplt,FUN=function(idxDirAllSplt){
base::all(
idxDirSplt %in% idxDirAllSplt
)
})
)
) <= 1
})
)
DirIn <- base::unique(dirAll[setMtch])

} else {
Expand Down
2 changes: 1 addition & 1 deletion pack/NEONprocIS.pub/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


# Start with the neon-is-base-r image.
FROM us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pack-base-r:v1.4.7
FROM us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pack-base-r:sha-92ae3cc

# maintainer handle
MAINTAINER "Cove Sturtevant" [email protected]
Expand Down

0 comments on commit d3565cf

Please sign in to comment.