Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix compute for Teradata #1126

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions R/backend-teradata.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,95 @@ dbplyr_edition.Teradata <- function(con) {
2L
}

#' @export
sql_table_index.Teradata <- function(con, table, columns, name = NULL,
unique = FALSE, ...) {

message("sql_table_index.Teradata do nothing.")

# assertthat::assert_that(is_string(table) | is.schema(table), is.character(columns))
stopifnot(rlang::is_string(table) | dbplyr:::is.schema(table))
stopifnot(is.character(columns))

name <- name %||% paste0(c(unclass(table), columns), collapse = "_")
fields <- dbplyr::escape(dbplyr::ident(columns), parens = TRUE, con = con)

# not working
# build_sql(
# "CREATE ", if (unique) sql("UNIQUE "), "INDEX ", as.sql(name, con = con),
# " ON ", as.sql(table, con = con), " ", fields,
# con = con
# )

# run teradata query that do nothing
"select 1"

}

#' @export
sql_query_save.Teradata <- function(con, sql, name,
temporary = TRUE,
unique_indexes,
indexes,
partition_by,
index,
...) {

if(length(unique_indexes) > 0){
warning("argument unique_indexes ignored for Teradata in db_copy_to")
}
if(!is.null(indexes) && !is.list(indexes)){
stop("expect list of length 1 with character vector of arbitrary length for argument indexes for Teradata in db_copy_to")
} else if(length(indexes) > 1){
stop("expect list of length 1 with character vector of arbitrary length for argument indexes for Teradata in db_copy_to")
}
if(!missing(index) & missing(indexes)){
stop("Argument `index` was specified and argument`indexes` was not specified.\nArgument `index` must not be used. Use `indexes` instead.")
}

build_sql(
"CREATE MULTISET ",
if (temporary) "VOLATILE ",
"TABLE \n",
as.sql(name, con),
if(temporary) ", NO LOG",
" AS (\n",
sql,
"\n) WITH DATA\n",
if(!missing(indexes)) glue::glue_sql("\nPRIMARY INDEX ({`primary_index`*})", .con = con, primary_index = indexes[1][[1]]),
if(!missing(partition_by)) paste0("\n", partition_by, "\n"),
if (temporary) "\nON COMMIT PRESERVE ROWS",
con = con
)

}

#' @export
db_compute.Teradata <- function(con,
table,
sql,
temporary = TRUE,
unique_indexes = list(),
indexes = list(),
analyze = TRUE,
...) {
new <- db_table_temporary(con, table, temporary)
table <- new$table
temporary <- new$temporary

table <- dbplyr_save_query(
con, sql, table, temporary = temporary,
unique_indexes = unique_indexes,
indexes = indexes,
...
)
# create_indexes(con, table, unique_indexes, unique = TRUE)
# create_indexes(con, table, indexes)
if (analyze) dbplyr_analyze(con, table)

table
}

#' @export
sql_query_select.Teradata <- function(con, select, from, where = NULL,
group_by = NULL, having = NULL,
Expand All @@ -52,6 +141,44 @@ sql_query_select.Teradata <- function(con, select, from, where = NULL,
)
}

#' @export
db_copy_to.Teradata <- function(con, table, values,
overwrite = FALSE, types = NULL, temporary = TRUE,
unique_indexes = NULL, indexes = NULL,
analyze = FALSE, # non default # not sufficient to deactivate
...,
in_transaction = TRUE) {

new <- db_table_temporary(con, table, temporary)
table <- new$table
temporary <- new$temporary
call <- current_env()

with_transaction(con, in_transaction, {
tryCatch(
{
table <- dplyr::db_write_table(con, table,
types = types,
values = values,
temporary = temporary,
overwrite = overwrite,
unique_indexes = unique_indexes,
indexes = indexes,
...
)
# create_indexes(con, table, unique_indexes, unique = TRUE)
# create_indexes(con, table, indexes)
if (analyze) dbplyr_analyze(con, table)
},
error = function(cnd) {
cli_abort("Can't copy to table {.val {table}}", parent = cnd, call = call)
}
)
})

table
}

#' @export
sql_translation.Teradata <- function(con) {
sql_variant(
Expand Down Expand Up @@ -187,3 +314,5 @@ win_rank_tdata <- function(f) {
)
}
}