Skip to content

Commit

Permalink
Merge pull request #95 from ssi-dk/chore/restructuring
Browse files Browse the repository at this point in the history
Move functions to separate files
  • Loading branch information
RasmusSkytte authored Feb 1, 2024
2 parents f22e842 + 5f043bd commit cd3b158
Show file tree
Hide file tree
Showing 38 changed files with 1,274 additions and 1,251 deletions.
117 changes: 0 additions & 117 deletions R/connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,120 +112,3 @@ close_connection <- function(conn) {

DBI::dbDisconnect(conn)
}


#' Convenience function for DBI::Id
#'
#' @template db_table_id
#' @template conn
#' @param allow_table_only
#' logical. If `TRUE`, allows for returning an `DBI::Id` with `table` = `myschema.table` if schema `myschema`
#' is not found in `conn`.
#' If `FALSE`, the function will raise an error if the implied schema cannot be found in `conn`
#' @details The given `db_table_id` is parsed to a DBI::Id depending on the type of input:
#' * `character`: db_table_id is parsed to a DBI::Id object using an assumption of "schema.table" syntax
#' with corresponding schema (if found in `conn`) and table values.
#' If no schema is implied, the default schema of `conn` will be used.
#'
#' * `DBI::Id`: if schema is not specified in `Id`, the schema is set to the default schema for `conn` (if given).
#'
#' * `tbl_sql`: the remote name is used to resolve the table identification.
#'
#' @return A DBI::Id object parsed from db_table_id (see details)
#' @examples
#' id("schema.table")
#' @seealso [DBI::Id] which this function wraps.
#' @export
id <- function(db_table_id, conn = NULL, allow_table_only = TRUE) {
UseMethod("id")
}


#' @export
id.Id <- function(db_table_id, conn = NULL, allow_table_only = TRUE) {
return(DBI::Id(schema = purrr::pluck(db_table_id, "name", "schema", .default = SCDB::get_schema(conn)),
table = purrr::pluck(db_table_id, "name", "table")))
}


#' @export
id.character <- function(db_table_id, conn = NULL, allow_table_only = TRUE) {

checkmate::assert(is.null(conn), DBI::dbIsValid(conn), combine = "or")

if (stringr::str_detect(db_table_id, "\\.")) {
db_name <- stringr::str_split_1(db_table_id, "\\.")
db_schema <- db_name[1]
db_table <- db_name[2]

# If no matching implied schema is found, return the unmodified db_table_id in the default schema
if (allow_table_only && !is.null(conn) && !schema_exists(conn, db_schema)) {
return(DBI::Id(schema = get_schema(conn), table = db_table_id))
}
} else {
db_schema <- get_schema(conn)
db_table <- db_table_id
}

return(DBI::Id(schema = db_schema, table = db_table))
}


#' @export
id.tbl_dbi <- function(db_table_id, conn = NULL, allow_table_only = TRUE) {

# If table identification is fully qualified extract Id from remote_Table
if (!is.na(purrr::pluck(dbplyr::remote_table(db_table_id), unclass, "schema"))) {

table_ident <- dbplyr::remote_table(db_table_id) |>
unclass() |>
purrr::discard(is.na)

table_conn <- dbplyr::remote_con(db_table_id)

return(
DBI::Id(
catalog = purrr::pluck(table_ident, "catalog"),
schema = purrr::pluck(table_ident, "schema", .default = get_schema(table_conn)),
table = purrr::pluck(table_ident, "table")
)
)

} else {

# If not attempt to resolve the table from existing tables.
# For SQLite, there should only be one table in main/temp matching the table
# In some cases, tables may have been added to the DB that makes the id ambiguous.
schema <- get_tables(dbplyr::remote_con(db_table_id), show_temporary = TRUE) |>
dplyr::filter(.data$table == dbplyr::remote_name(db_table_id)) |>
dplyr::pull("schema")

if (length(schema) > 1) {
stop(
"Table identification has been corrupted! ",
"This table does not contain information about its schema and ",
"multiple tables with this name were found across schemas."
)
}

return(DBI::Id(schema = schema, table = dbplyr::remote_name(db_table_id)))
}
}


#' @export
as.character.Id <- function(x, ...) {

info <- x@name |>
purrr::discard(is.na)

id_representation <- list(
catalog = purrr::pluck(info, "catalog"),
schema = purrr::pluck(info, "schema"),
table = purrr::pluck(info, "table")
) |>
purrr::discard(is.null) |>
do.call(purrr::partial(paste, sep = "."), args = _)

return(id_representation)
}
34 changes: 34 additions & 0 deletions R/create_logs_if_missing.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#' Create a table with the SCDB log structure if it does not exists
#' @template conn
#' @param log_table A specification of where the logs should exist ("schema.table")
#' @return A tbl_dbi with the generated (or existing) log table
#' @examples
#' conn <- get_connection(drv = RSQLite::SQLite())
#' log_table_id <- id("test.logs", conn = conn, allow_table_only = TRUE)
#'
#' create_logs_if_missing(log_table_id, conn)
#'
#' close_connection(conn)
#' @export
create_logs_if_missing <- function(log_table, conn) {

checkmate::assert_class(conn, "DBIConnection")

if (!table_exists(conn, log_table)) {
log_signature <- data.frame(date = as.POSIXct(NA),
schema = NA_character_,
table = NA_character_,
n_insertions = NA_integer_,
n_deactivations = NA_integer_,
start_time = as.POSIXct(NA),
end_time = as.POSIXct(NA),
duration = NA_character_,
success = NA,
message = NA_character_,
log_file = NA_character_)

DBI::dbCreateTable(conn, id(log_table, conn), log_signature)
}

return(dplyr::tbl(conn, id(log_table, conn), check_from = FALSE))
}
83 changes: 0 additions & 83 deletions R/create_table.R
Original file line number Diff line number Diff line change
Expand Up @@ -70,86 +70,3 @@ create_table <- function(.data, conn = NULL, db_table_id, ...) {

return(invisible(dplyr::tbl(conn, db_table_id, check_from = FALSE)))
}



#' @importFrom methods setGeneric
methods::setGeneric("getTableSignature",
function(.data, conn = NULL) standardGeneric("getTableSignature"),
signature = "conn")

methods::setMethod("getTableSignature", "DBIConnection", function(.data, conn) {
# Define the column types to be updated based on backend class
col_types <- DBI::dbDataType(conn, .data)

backend_coltypes <- list(
"PqConnection" = c(
checksum = "TEXT",
from_ts = "TIMESTAMP",
until_ts = "TIMESTAMP"
),
"SQLiteConnection" = c(
checksum = "TEXT",
from_ts = "TEXT",
until_ts = "TEXT"
),
"Microsoft SQL Server" = c(
checksum = "varchar(32)",
from_ts = "DATETIME2",
until_ts = "DATETIME2"
)
)

checkmate::assert_choice(class(conn), names(backend_coltypes))

# Update columns with indices instead of names to avoid conflicts
special_cols <- backend_coltypes[[class(conn)]]
special_indices <- (1 + length(.data) - length(special_cols)):length(.data)

return(replace(col_types, special_indices, special_cols))
})

methods::setMethod("getTableSignature", "NULL", function(.data, conn) {
# Emulate product of DBI::dbDataType
signature <- dplyr::summarise(.data, dplyr::across(tidyselect::everything(), ~ class(.)[1]))

stats::setNames(as.character(signature), names(signature))

return(signature)
})


#' Create a table with the SCDB log structure if it does not exists
#' @template conn
#' @param log_table A specification of where the logs should exist ("schema.table")
#' @return A tbl_dbi with the generated (or existing) log table
#' @examples
#' conn <- get_connection(drv = RSQLite::SQLite())
#' log_table_id <- id("test.logs", conn = conn, allow_table_only = TRUE)
#'
#' create_logs_if_missing(log_table_id, conn)
#'
#' close_connection(conn)
#' @export
create_logs_if_missing <- function(log_table, conn) {

checkmate::assert_class(conn, "DBIConnection")

if (!table_exists(conn, log_table)) {
log_signature <- data.frame(date = as.POSIXct(NA),
schema = NA_character_,
table = NA_character_,
n_insertions = NA_integer_,
n_deactivations = NA_integer_,
start_time = as.POSIXct(NA),
end_time = as.POSIXct(NA),
duration = NA_character_,
success = NA,
message = NA_character_,
log_file = NA_character_)

DBI::dbCreateTable(conn, id(log_table, conn), log_signature)
}

return(dplyr::tbl(conn, id(log_table, conn), check_from = FALSE))
}
68 changes: 68 additions & 0 deletions R/filter_keys.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#' Filters .data according to all records in the filter
#'
#' @description
#' If filter = NULL, not filtering is done
#' If filter is different from NULL, the .data is filtered by a inner_join using all columns of the filter:
#' \code{inner_join(.data, filter, by = colnames(filter))}
#'
#' by and na_by can overwrite the inner_join columns used in the filtering
#'
#' @template .data
#' @template filters
#' @param by passed to inner_join if different from NULL
#' @param na_by passed to inner_join if different from NULL
#' @template .data_return
#' @examples
#' # Filtering with null means no filtering is done
#' filter <- NULL
#' identical(filter_keys(mtcars, filter), mtcars) # TRUE
#'
#' # Filtering by vs = 0
#' filter <- data.frame(vs = 0)
#' identical(filter_keys(mtcars, filter), dplyr::filter(mtcars, vs == 0)) # TRUE
#'
#' # Filtering by the specific combinations of vs = 0 and am = 1
#' filter <- dplyr::distinct(mtcars, vs, am)
#' filter_keys(mtcars, filter)
#'
#' @importFrom rlang .data
#' @export
filter_keys <- function(.data, filters, by = NULL, na_by = NULL) {
if (is.null(filters)) {
return(.data)
}

assert_data_like(.data)
assert_data_like(filters, null.ok = TRUE)
checkmate::assert_subset(c(by, na_by), colnames(filters))

UseMethod("filter_keys")
}

#' @export
filter_keys.tbl_sql <- function(.data, filters, by = NULL, na_by = NULL) {

if (is.null(by) && is.null(na_by)) {
# Determine key types
key_types <- filters |>
dplyr::ungroup() |>
dplyr::summarise(dplyr::across(
.cols = tidyselect::everything(),
.fns = ~ sum(ifelse(is.na(.), 0, 1), na.rm = TRUE)
)) |>
tidyr::pivot_longer(tidyselect::everything(), names_to = "column_name", values_to = "is_na")

by <- key_types |> dplyr::filter(.data$is_na > 0) |> dplyr::pull("column_name")
na_by <- key_types |> dplyr::filter(.data$is_na == 0) |> dplyr::pull("column_name")

if (length(by) == 0) by <- NULL
if (length(na_by) == 0) na_by <- NULL
}
return(dplyr::inner_join(.data, filters, by = by, na_by = na_by))
}

#' @export
filter_keys.data.frame <- function(.data, filters, by = NULL, na_by = NULL) {
if (is.null(by) && is.null(na_by)) by <- colnames(filters)
return(dplyr::inner_join(.data, filters, by = c(by, na_by)))
}
44 changes: 44 additions & 0 deletions R/getTableSignature.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#' @importFrom methods setGeneric
methods::setGeneric("getTableSignature",
function(.data, conn = NULL) standardGeneric("getTableSignature"),
signature = "conn")

methods::setMethod("getTableSignature", "DBIConnection", function(.data, conn) {
# Define the column types to be updated based on backend class
col_types <- DBI::dbDataType(conn, .data)

backend_coltypes <- list(
"PqConnection" = c(
checksum = "TEXT",
from_ts = "TIMESTAMP",
until_ts = "TIMESTAMP"
),
"SQLiteConnection" = c(
checksum = "TEXT",
from_ts = "TEXT",
until_ts = "TEXT"
),
"Microsoft SQL Server" = c(
checksum = "varchar(32)",
from_ts = "DATETIME2",
until_ts = "DATETIME2"
)
)

checkmate::assert_choice(class(conn), names(backend_coltypes))

# Update columns with indices instead of names to avoid conflicts
special_cols <- backend_coltypes[[class(conn)]]
special_indices <- (1 + length(.data) - length(special_cols)):length(.data)

return(replace(col_types, special_indices, special_cols))
})

methods::setMethod("getTableSignature", "NULL", function(.data, conn) {
# Emulate product of DBI::dbDataType
signature <- dplyr::summarise(.data, dplyr::across(tidyselect::everything(), ~ class(.)[1]))

stats::setNames(as.character(signature), names(signature))

return(signature)
})
Loading

0 comments on commit cd3b158

Please sign in to comment.