-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
rstevenson
committed
Nov 21, 2017
0 parents
commit ccb1080
Showing
16 changed files
with
1,009 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
^.*\.Rproj$ | ||
^\.Rproj\.user$ | ||
cran-comments.md |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
.Rproj.user | ||
.Rhistory | ||
.RData | ||
.Ruserdata | ||
inst/doc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
Package: condusco | ||
Type: Package | ||
Title: Query-Driven Pipeline Execution and Query Templates | ||
Version: 0.1.0 | ||
Author: Roland Stevenson | ||
Maintainer: Roland Stevenson <[email protected]> | ||
Description: Runs a function iteratively over each row of either a dataframe | ||
or the results of a query. Use the 'BigQuery' and 'DBI' wrappers to | ||
iteratively pass each row of query results to a function. If a field | ||
contains a 'JSON' string, it will be converted to an object. This is | ||
helpful for queries that return 'JSON' strings that represent objects. | ||
These fields can then be treated as objects by the pipeline. | ||
License: GPL-3 | ||
URL: https://github.com/ras44/condusco | ||
BugReports: https://github.com/ras44/condusco/issues | ||
Encoding: UTF-8 | ||
LazyData: true | ||
Suggests: knitr, rmarkdown, whisker, testthat, RSQLite | ||
VignetteBuilder: knitr | ||
Depends: R (>= 3.3.2), | ||
jsonlite, | ||
assertthat, | ||
bigrquery, | ||
DBI | ||
RoxygenNote: 6.0.1.9000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# Generated by roxygen2: do not edit by hand | ||
|
||
export(run_pipeline) | ||
export(run_pipeline_dbi) | ||
export(run_pipeline_gbq) | ||
import(DBI) | ||
import(assertthat) | ||
import(bigrquery) | ||
import(jsonlite) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# News | ||
|
||
## 0.1.0 | ||
condusco now contains the following functions | ||
run_pipeline | ||
run_pipeline_dbi | ||
run_pipeline_gbq |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
#' Runs user-provided pipeline for each row of arguments in parameters, converting any JSON | ||
#' strings to objects | ||
#' | ||
#' @param pipeline User-provided function with one argument, a dataframe | ||
#' @param parameters An dataframe of fields to convert to json | ||
#' | ||
#' @import assertthat jsonlite | ||
#' | ||
#' @examples | ||
#' | ||
#' library(whisker) | ||
#' | ||
#' run_pipeline( | ||
#' function(params){ | ||
#' query <- "SELECT result FROM {{table_prefix}}_results;" | ||
#' whisker.render(query,params) | ||
#' }, | ||
#' data.frame( | ||
#' table_prefix = c('batman', 'robin') | ||
#' ) | ||
#') | ||
#' | ||
#' @export | ||
run_pipeline <- function(pipeline, parameters){ | ||
|
||
assert_that(length(parameters)>0) | ||
|
||
#For each row in parameters, convert each column to json object if it contains json | ||
apply(parameters, 1, function(row){ | ||
lr <- as.list(row) | ||
for(n in names(lr)){ | ||
tryCatch({ | ||
lr[[n]] <- fromJSON(get(n,lr), simplifyVector=FALSE) | ||
},error=function(e){ | ||
lr[[n]] <- toString(get(n,lr)) | ||
} | ||
) | ||
} | ||
|
||
pipeline(lr) | ||
}) | ||
|
||
} | ||
|
||
|
||
#' A wrapper for running pipelines with a BigQuery invocation query | ||
#' | ||
#' @param pipeline User-provided function with one argument, one row of query results | ||
#' @param query A query to execute in Google BigQuery | ||
#' @param project The Google BigQuery project to bill | ||
#' @param ... Additional arguments passed to query_exec() | ||
#' | ||
#' @import bigrquery | ||
#' | ||
#' @examples | ||
#' | ||
#'\dontrun{ | ||
#' library(whisker) | ||
#' | ||
#' #Set GBQ project | ||
#' project <- '' | ||
#' | ||
#' #Set the following options for GBQ authentication on a cloud instance | ||
#' options("httr_oauth_cache" = "~/.httr-oauth") | ||
#' options(httr_oob_default=TRUE) | ||
#' | ||
#' #Run the below query to authenticate and write credentials to .httr-oauth file | ||
#' query_exec("SELECT 'foo' as bar",project=project); | ||
#' | ||
#' pipeline <- function(params){ | ||
#' | ||
#' query <- " | ||
#' SELECT | ||
#' {{#list}} | ||
#' SUM(CASE WHEN author.name ='{{name}}' THEN 1 ELSE 0 END) as n_{{name_clean}}, | ||
#' {{/list}} | ||
#' repo_name | ||
#' FROM `bigquery-public-data.github_repos.sample_commits` | ||
#' GROUP BY repo_name | ||
#' ;" | ||
#' | ||
#' res <- query_exec( | ||
#' whisker.render(query,params), | ||
#' project=project, | ||
#' use_legacy_sql = FALSE | ||
#' ); | ||
#' | ||
#' print(res) | ||
#' } | ||
#' | ||
#' run_pipeline_gbq(pipeline, " | ||
#' SELECT CONCAT('[', | ||
#' STRING_AGG( | ||
#' CONCAT('{\"name\":\"',name,'\",' | ||
#' ,'\"name_clean\":\"', REGEXP_REPLACE(name, r'[^[:alpha:]]', ''),'\"}' | ||
#' ) | ||
#' ), | ||
#' ']') as list | ||
#' FROM ( | ||
#' SELECT author.name, | ||
#' COUNT(commit) n_commits | ||
#' FROM `bigquery-public-data.github_repos.sample_commits` | ||
#' GROUP BY 1 | ||
#' ORDER BY 2 DESC | ||
#' LIMIT 10 | ||
#' ) | ||
#' ", | ||
#' project, | ||
#' use_legacy_sql = FALSE | ||
#' ) | ||
#'} | ||
#' @export | ||
run_pipeline_gbq <- function(pipeline, query, project, ... ){ | ||
|
||
#run the query to generate the intitialization table | ||
parameters <- query_exec(query, project=project, ...) | ||
|
||
run_pipeline(pipeline, parameters) | ||
|
||
} | ||
|
||
#' A wrapper for running pipelines with a DBI connection invocation query | ||
#' | ||
#' @param pipeline User-provided function with one argument, one row of query results | ||
#' @param query A query to execute via the DBI connection | ||
#' @param con The DBI connection | ||
#' @param ... Additional arguments passed to dbSendQuery() and dbFetch() | ||
#' | ||
#' @import DBI | ||
#' | ||
#' @examples | ||
#' | ||
#'\dontrun{ | ||
#' library(whisker) | ||
#' library(RSQLite) | ||
#' | ||
#' con <- dbConnect(RSQLite::SQLite(), ":memory:") | ||
#' | ||
#' dbWriteTable(con, "mtcars", mtcars) | ||
#' | ||
#' #for each cylinder count, count the number of top 5 hps it has | ||
#' pipeline <- function(params){ | ||
#' | ||
#' query <- "SELECT | ||
#' {{#list}} | ||
#' SUM(CASE WHEN hp='{{val}}' THEN 1 ELSE 0 END )as n_hp_{{val}}, | ||
#' {{/list}} | ||
#' cyl | ||
#' FROM mtcars | ||
#' GROUP BY cyl | ||
#' ;" | ||
#' | ||
#' | ||
#' dbGetQuery( | ||
#' con, | ||
#' whisker.render(query,params) | ||
#' ) | ||
#' } | ||
#' | ||
#' | ||
#' #pass the top 5 most common hps as val params | ||
#' run_pipeline_dbi( | ||
#' pipeline, | ||
#' ' | ||
#' SELECT "[" || GROUP_CONCAT("{ ""val"": """ || hp || """ }") || "]" AS list | ||
#' FROM ( | ||
#' SELECT | ||
#' CAST(hp as INTEGER) as HP, | ||
#' count(hp) as cnt | ||
#' FROM mtcars | ||
#' GROUP BY hp | ||
#' ORDER BY cnt DESC | ||
#' LIMIT 5 | ||
#' ) | ||
#' ', | ||
#' con | ||
#' ) | ||
#' | ||
#' | ||
#' dbDisconnect(con) | ||
#'} | ||
#' @export | ||
run_pipeline_dbi <- function(pipeline, query, con, ...){ | ||
|
||
rs <- dbSendQuery(con, query, ...) | ||
parameters <- dbFetch(rs, ...) | ||
|
||
dbClearResult(rs, ...) | ||
|
||
run_pipeline(pipeline, parameters) | ||
|
||
} |
Oops, something went wrong.