Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow PARQUET format for uploading data. #609

Merged
merged 24 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eaab36a
Allow PARQUET format for uploading data.
apalacio9502 May 18, 2024
fa79291
Fixed typo in bq upload
apalacio9502 May 20, 2024
41d2151
Add arrow to suggested packages
apalacio9502 May 20, 2024
bbde49e
Delete verbose post metadata in bq_upload
apalacio9502 May 20, 2024
95d8c3d
Drop R 3.6 check since we're losing it soon anyway and the arrow pack…
apalacio9502 May 20, 2024
509feb6
Add importFrom httr PUT in bq_upload
apalacio9502 May 21, 2024
a45a72c
Update news
apalacio9502 May 21, 2024
b16fe81
Remove the multipart functions, as they are no longer required.
apalacio9502 May 21, 2024
adab0bf
Improve the message indicating that Arrow is required for source_form…
apalacio9502 May 24, 2024
fc8eea0
R 3.6 is no longer supported
apalacio9502 May 24, 2024
76fd963
bq_perform_upload documentation specifies that the arrow package is r…
apalacio9502 May 24, 2024
1e90f8f
Update news
apalacio9502 May 24, 2024
3196088
Use arg_match instead of check_string in source_format
apalacio9502 May 27, 2024
41fae01
Use defer() insted on.exit()
apalacio9502 May 27, 2024
d7255b1
Use check_installed instead of requireNamespace
apalacio9502 May 27, 2024
453e946
Style code
apalacio9502 May 27, 2024
485a898
Implement BufferedOutputStream() to avoid writing to disk.
apalacio9502 May 27, 2024
774b62e
The bq_upload function is improved
apalacio9502 May 27, 2024
14a7e29
Delete a typo
apalacio9502 May 27, 2024
af040e1
arrow as a string in check_installed
apalacio9502 May 27, 2024
3a49fe6
Update importFrom in bq_upload
apalacio9502 May 27, 2024
f293253
Remove headings from the development section.
apalacio9502 Jun 1, 2024
a68d651
Arrow package replaced by nanoparquet for Parquet files.
apalacio9502 Jun 1, 2024
56f4207
Apply suggestions from code review
hadley Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ jobs:
- {os: macos-latest, r: 'release'}

- {os: windows-latest, r: 'release'}
# Use 3.6 to trigger usage of RTools35
- {os: windows-latest, r: '3.6'}
apalacio9502 marked this conversation as resolved.
Show resolved Hide resolved
# use 4.1 to check with rtools40's older compiler
- {os: windows-latest, r: '4.1'}

Expand Down
7 changes: 4 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ License: MIT + file LICENSE
URL: https://bigrquery.r-dbi.org, https://github.com/r-dbi/bigrquery
BugReports: https://github.com/r-dbi/bigrquery/issues
Depends:
R (>= 3.6)
R (>= 4.0)
Imports:
bit64,
brio,
Expand All @@ -28,8 +28,9 @@ Imports:
methods,
prettyunits,
rlang (>= 1.1.0),
tibble
Suggests:
tibble,
nanoparquet
Suggests:
blob,
covr,
dbplyr (>= 2.4.0),
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ importFrom(httr,DELETE)
importFrom(httr,GET)
importFrom(httr,PATCH)
importFrom(httr,POST)
importFrom(httr,PUT)
importFrom(httr,add_headers)
importFrom(httr,config)
importFrom(httr,content)
importFrom(httr,headers)
importFrom(httr,http_status)
importFrom(httr,parse_media)
importFrom(httr,status_code)
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# bigrquery (development version)

* The `bq_perform_upload()` function now allows users to choose the transmission format (JSON or PARQUET) for data sent to BigQuery (@apalacio9502, #608).
* bigrquery now requires R 4.0, in line with our version support principles.

# bigrquery 1.5.1

* Forward compatibility with upcoming dbplyr release (#601).
Expand Down
50 changes: 39 additions & 11 deletions R/bq-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ bq_perform_extract <- function(x,
#' @export
#' @name api-perform
#' @param values Data frame of values to insert.
#' @param source_format The format of the data files:
#' * For newline-delimited JSON, specify "NEWLINE_DELIMITED_JSON".
#' * For parquet, specify "PARQUET".
apalacio9502 marked this conversation as resolved.
Show resolved Hide resolved
#' @param create_disposition Specifies whether the job is allowed to create
#' new tables.
#'
Expand All @@ -110,6 +113,7 @@ bq_perform_extract <- function(x,
#' 'duplicate' error is returned in the job result.
bq_perform_upload <- function(x, values,
fields = NULL,
source_format = c("NEWLINE_DELIMITED_JSON", "PARQUET"),
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_EMPTY",
...,
Expand All @@ -121,12 +125,13 @@ bq_perform_upload <- function(x, values,
cli::cli_abort("{.arg values} must be a data frame.")
}
fields <- as_bq_fields(fields)
arg_match(source_format)
check_string(create_disposition)
check_string(write_disposition)
check_string(billing)

load <- list(
sourceFormat = unbox("NEWLINE_DELIMITED_JSON"),
sourceFormat = unbox(source_format),
destinationTable = tableReference(x),
createDisposition = unbox(create_disposition),
writeDisposition = unbox(write_disposition)
Expand All @@ -139,22 +144,30 @@ bq_perform_upload <- function(x, values,
load$autodetect <- unbox(TRUE)
}

config <- list(configuration = list(load = load))
config <- bq_body(config, ...)
config_part <- part(
c("Content-type" = "application/json; charset=UTF-8"),
jsonlite::toJSON(config, pretty = TRUE)
metadata <- list(configuration = list(load = load))
metadata <- bq_body(metadata, ...)
metadata <- list(
"type" = "application/json; charset=UTF-8",
"content" = jsonlite::toJSON(metadata, pretty = TRUE)
)

data_part <- part(
c("Content-type" = "application/json; charset=UTF-8"),
export_json(values)
)
if (source_format == "NEWLINE_DELIMITED_JSON") {
media <- list(
"type" = "application/json; charset=UTF-8",
"content" = export_json(values)
)
} else {
media <- list(
"type" = "application/vnd.apache.parquet",
"content" = export_parquet(values)
)
}

url <- bq_path(billing, jobs = "")
res <- bq_upload(
url,
parts = c(config_part, data_part),
metadata,
media,
query = list(fields = "jobReference")
)
as_bq_job(res$jobReference)
Expand Down Expand Up @@ -186,6 +199,21 @@ export_json <- function(values) {
rawToChar(rawConnectionValue(con))
}

# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet?hl=es-419
export_parquet <- function(values) {

tmpfile <- tempfile(fileext = ".parquet")

defer(unlink(tmpfile))

# write to disk
nanoparquet::write_parquet(values, tmpfile)

# read back results
readBin(tmpfile, what = "raw", n = file.info(tmpfile)$size)

}

#' @export
#' @name api-perform
#' @param source_uris The fully-qualified URIs that point to your data in
Expand Down
77 changes: 28 additions & 49 deletions R/bq-request.R
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,36 @@ bq_patch <- function(url, body, ..., query = NULL, token = bq_token()) {
process_request(req)
}

#' @importFrom httr POST add_headers config
bq_upload <- function(url, parts, ..., query = list(), token = bq_token()) {
url <- paste0(upload_url, url)
req <- POST_multipart_related(
url,
parts = parts,
token,
#' @importFrom httr POST PUT add_headers headers config status_code
# https://cloud.google.com/bigquery/docs/reference/api-uploads
bq_upload <- function(url, metadata, media, query = list(), token = bq_token()) {

query <- utils::modifyList(list(fields = "jobReference",uploadType = "resumable"), query)
config <- add_headers("Content-Type" = metadata[["type"]])

req <- POST(
paste0(upload_url, url),
body = metadata[["content"]],
httr::user_agent(bq_ua()),
...,
query = prepare_bq_query(query)
token,
config,
query = query
)

if (status_code(req) == 200) {

config <- add_headers("Content-Type" = media[["type"]])

req <- PUT(
headers(req)$location,
body = media[["content"]],
httr::user_agent(bq_ua()),
token,
config
)

}

process_request(req)
}

Expand Down Expand Up @@ -242,43 +261,3 @@ gargle_abort <- function(reason, message, status, call = caller_env()) {
cli::cli_abort(message, class = class, call = call)
}

# Multipart/related ------------------------------------------------------------


# http://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
POST_multipart_related <- function(url, config = NULL, parts = NULL,
apalacio9502 marked this conversation as resolved.
Show resolved Hide resolved
query = list(), ...,
boundary = random_boundary(),
handle = NULL) {
if (is.null(config)) config <- config()

sep <- paste0("\n--", boundary, "\n")
end <- paste0("\n--", boundary, "--\n")

body <- paste0(sep, paste0(parts, collapse = sep), end)

type <- paste0("multipart/related; boundary=", boundary)
config <- c(config, add_headers("Content-Type" = type))

query <- utils::modifyList(list(uploadType = "multipart"), query)

POST(url, config = config, body = body, query = query, ..., handle = handle)
}

part <- function(headers, body) {
if (length(headers) == 0) {
header <- "\n"
} else {
header <- paste0(names(headers), ": ", headers, "\n", collapse = "")
}
body <- paste0(body, collapse = "\n")

paste0(header, "\n", body)
}

random_boundary <- function() {
valid <- c(LETTERS, letters, 0:9) # , "'", "(", ")", "+", ",", "-", ".", "/",
# ":", "?")
paste0(sample(valid, 50, replace = TRUE), collapse = "")
}

21 changes: 11 additions & 10 deletions man/api-perform.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading