From 237ff2409d5f42d49c61211a022a3a0091a11a30 Mon Sep 17 00:00:00 2001 From: Jochen Weile Date: Thu, 28 Jun 2018 13:39:52 -0400 Subject: [PATCH] Improved and enabled background synchronization (Issue #17); added caching for structural features (Issue #4) --- R/calcStrucFeats.R | 10 ++++ docker/daemon.R | 11 ++-- docker/sync.R | 129 ++++++++++++++++++++++++++++++++------------- 3 files changed, 108 insertions(+), 42 deletions(-) diff --git a/R/calcStrucFeats.R b/R/calcStrucFeats.R index 19cca8a..635dab6 100644 --- a/R/calcStrucFeats.R +++ b/R/calcStrucFeats.R @@ -159,6 +159,14 @@ subcomplex.combos <- function(pdb.file,chain.sets) { #' } #' @export calc.strucfeats <- function(pdb.acc,main.chain) { + + struc.cache.file <- getCacheFile(paste0(pdb.acc,":",main.chain,"_features.csv")) + + if (file.exists(struc.cache.file)) { + cat("Using cached features...\n") + burial.all <- read.csv(struc.cache.file) + return(burial.all) + } library("httr") set_config(config(ssl_verifypeer = 0L)) @@ -238,6 +246,8 @@ calc.strucfeats <- function(pdb.acc,main.chain) { burial.all <- cbind(burial[as.character(allpos),],secstruc=secstruc[as.character(allpos),"struc"]) rownames(burial.all) <- burial.all$pos <- allpos + write.table(burial.all,struc.cache.file,sep=",") + # cat("Deleting temporary files...\n") # #clean up temp files # file.remove(pdb.file) diff --git a/docker/daemon.R b/docker/daemon.R index 4097d89..881633d 100644 --- a/docker/daemon.R +++ b/docker/daemon.R @@ -62,8 +62,7 @@ daemon <- function() { #infinite loop while(TRUE) { #start DB synchronization if necessary - #TODO: enable once production DB is updated - # check.sync() + check.sync() #patrol the directory for new jobs patrol() #sleep for two seconds until next patrol @@ -80,10 +79,10 @@ lastSyncTime <- as.Date("2018-01-01") #check whether 1 day has passed since the last synchronization, if so run it. check.sync <- function() { #calculate time passed since last synchronization - daysSinceSync <- difftime(Sys.time(), lastSyncTime, units = "days") - #if more than one day has passed - if (daysSinceSync > 1) { - #start the sync job + minSinceSync <- difftime(Sys.time(), lastSyncTime, units = "mins") + #if more than five minutes has passed + if (minSinceSync > 5) { + #start a sync job system( paste("Rscript sync.R"), wait=FALSE diff --git a/docker/sync.R b/docker/sync.R index 70e1ce4..9e4a18a 100644 --- a/docker/sync.R +++ b/docker/sync.R @@ -2,16 +2,15 @@ options(stringsAsFactors=FALSE) -#This requires the new changes that are in the rapimave devel branch!! library(rapimave) library(hgvsParseR) library(yogitools) library(mavevis) -baseURL <- "http://ec2-13-210-169-246.ap-southeast-2.compute.amazonaws.com/api/" + +baseURL <- "https://www.mavedb.org/api/" #Caching directory -# cache.dir <- paste0(tempdir(),"/") cache.dir <- Sys.getenv("MAVEVIS_CACHE",unset="/var/www/mavevis/cache/") if (!file.exists(cache.dir)) { stop("Cache directory does not exist!") @@ -76,14 +75,6 @@ calcOffset <- function(uniprot.acc, maveSeq) { logger("Starting new synchronization cycle.") -#Open existing scorest index -indexFile <- paste0(cache.dir,"searchIndex.csv") -if (file.exists(indexFile)) { - index <- read.csv(indexFile) -} else { - index <- NULL -} - tryCatch({ #Open API connection @@ -92,11 +83,11 @@ tryCatch({ #Query list of scoresets scoresets <- rmave$getAllScoreSets() - #Iterate overscoresets - invisible(lapply(scoresets,function(scoreset) { + #Iterate overscoresets and build index + index <- as.df(lapply(scoresets,function(scoreset) { + #If it's an outdated scoreset, skip it! if (!is.null(scoreset$getNextVersion())) { - #it's an outdated scoreset! return(NULL) } @@ -104,37 +95,53 @@ tryCatch({ name <- scoreset$getTitle() #No need to process if it's already known - if (!is.null(index) && urn %in% index$urn) { - return(NULL) - } - - logger(paste("New scoreset found:",urn)) + # if (!is.null(index) && urn %in% index$urn) { + # return(NULL) + # } target <- scoreset$getTarget() tname <- target$getName() wtseq <- target$getSequence() uniprot <- target$getXrefUniprot() - value <- paste0(urn,": ",tname," - ",name) - label <- paste0(tname," - ",name) + #If the title already contains the target name, there's no need to repeat it + if (grepl(tname,name)) { + value <- paste0(urn,": ",name) + label <- paste0(name) + } else { + value <- paste0(urn,": ",tname," - ",name) + label <- paste0(tname," - ",name) + } - #Download scores and write to cache location - scoreTable <- rmave$getScores(urn) + #Check if scores are already cached scoreCacheFile <- paste0(cache.dir,urn,".csv") - write.table(scoreTable,scoreCacheFile,sep=",",row.names=FALSE) + if (!file.exists(scoreCacheFile)) { + #If not, download scores and write to cache location + scoreTable <- rmave$getScores(urn) + write.table(scoreTable,scoreCacheFile,sep=",",row.names=FALSE) + } - #Parse score file to check for presence of syn/stop - if (grepl(" \\(",scoreTable$hgvs[[1]])) { - hgvsp <- sub("\\)$","",sapply(strsplit(scoreTable$hgvs," \\("),`[[`,2)) + #Check if variant descriptors have already been cached + mutCacheFile <- paste0(cache.dir,urn,"_muts.csv") + if (!file.exists(mutCacheFile)) { + #if not, do so + if (!exists(scoreTable)) { + scoreTable <- read.csv(scoreCacheFile) + } + if (!all(is.na(scoreTable$hgvs_pro))) { + varInfo <- parseHGVS(scoreTable$hgvs_pro) + write.table(varInfo,mutCacheFile,sep=",",row.names=FALSE) + } else { + logger(paste("WARNING: Scoreset",urn,"has no protein-level variant descriptors.")) + return(NULL) + } } else { - hgvsp <- scoreTable$hgvs + varInfo <- read.csv(mutCacheFile) } - varInfo <- parseHGVS(hgvsp) - #Cache varInfo - mutCacheFile <- paste0(cache.dir,urn,"_muts.csv") - write.table(varInfo,mutCacheFile,sep=",",row.names=FALSE) + #Parse score file to check for presence of syn/stop + #Get off set or calculate if necessary offset <- uniprot$getOffset() if (is.null(offset)) { @@ -145,19 +152,69 @@ tryCatch({ hasStop <- any(varInfo$variant %in% c("Ter","*")) hasSyn <- any(varInfo$type == "synonymous") + #add scoreset information to index - index <<- rbind(index,data.frame( + list( value=value,label=label,urn=urn,target=tname, uniprot=uniprot$getID(), syn=if (hasSyn) "auto" else "manual", stop=if (hasStop) "auto" else "manual", offset=offset, wt=wtseq - )) + ) - logger("...cached and indexed") })) - #TODO: pre-cache alignments, PDB files, and structure tracks. + indexFile <- paste0(cache.dir,"searchIndex.csv") + write.table(index,indexFile,sep=",",row.names=FALSE) + + logger("Index successfully updated.") + + logger("Starting caching cycle.") + + #pre-cache alignments, PDB files, and structure tracks. + invisible(lapply(index$uniprot,function(acc) { + + #check if pre-calculated alignment exists. If not, create it. + alignment.file <- getCacheFile(paste0(acc,"_alignment.fasta")) + if (!file.exists(alignment.file)) { + logger(paste("Caching alignment for",acc)) + calc.conservation(acc) + } + + #check if pre-calculated pdb table exists. If not, create it. + pdb.table.file <- getCacheFile(paste0(acc,"_pdbs.csv")) + if (!file.exists(pdb.table.file)) { + + logger(paste("Caching structures for",acc)) + pdb.table <- find.pdbs(acc) + + #iterate over associated pdb structures + apply(pdb.table,1,function(pdb.row) { + pdbacc <- pdb.row[["pdb"]] + mainChains <- strsplit(pdb.row[["mainChains"]],"/")[[1]] + #iterate over possible main chains + lapply(mainChains,function(mc) { + #check if pre-calculated structure data exists. If not, create it. + struc.cache.file <- getCacheFile(paste0(pdbacc,":",mc,"_features.csv")) + if (!file.exists(struc.cache.file)) { + logger(paste("Caching features for",acc,":",pdbacc,"-",mc)) + tryCatch({ + calc.strucfeats(pdbacc,mc) + }, + error=function(e) { + logger(paste( + "ERROR: Features calculation failed for", + acc,":",pdbacc,"-",mc,"\n",e + )) + }) + } + }) + }) + + } + + })) + logger("Synchronization complete.")