Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow PARQUET format for uploading data. #609

Merged
merged 24 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eaab36a
Allow PARQUET format for uploading data.
apalacio9502 May 18, 2024
fa79291
Fixed typo in bq upload
apalacio9502 May 20, 2024
41d2151
Add arrow to suggested packages
apalacio9502 May 20, 2024
bbde49e
Delete verbose post metadata in bq_upload
apalacio9502 May 20, 2024
95d8c3d
Drop R 3.6 check since we're losing it soon anyway and the arrow pack…
apalacio9502 May 20, 2024
509feb6
Add importFrom httr PUT in bq_upload
apalacio9502 May 21, 2024
a45a72c
Update news
apalacio9502 May 21, 2024
b16fe81
Remove the multipart functions, as they are no longer required.
apalacio9502 May 21, 2024
adab0bf
Improve the message indicating that Arrow is required for source_form…
apalacio9502 May 24, 2024
fc8eea0
R 3.6 is no longer supported
apalacio9502 May 24, 2024
76fd963
bq_perform_upload documentation specifies that the arrow package is r…
apalacio9502 May 24, 2024
1e90f8f
Update news
apalacio9502 May 24, 2024
3196088
Use arg_match instead of check_string in source_format
apalacio9502 May 27, 2024
41fae01
Use defer() insted on.exit()
apalacio9502 May 27, 2024
d7255b1
Use check_installed instead of requireNamespace
apalacio9502 May 27, 2024
453e946
Style code
apalacio9502 May 27, 2024
485a898
Implement BufferedOutputStream() to avoid writing to disk.
apalacio9502 May 27, 2024
774b62e
The bq_upload function is improved
apalacio9502 May 27, 2024
14a7e29
Delete a typo
apalacio9502 May 27, 2024
af040e1
arrow as a string in check_installed
apalacio9502 May 27, 2024
3a49fe6
Update importFrom in bq_upload
apalacio9502 May 27, 2024
f293253
Remove headings from the development section.
apalacio9502 Jun 1, 2024
a68d651
Arrow package replaced by nanoparquet for Parquet files.
apalacio9502 Jun 1, 2024
56f4207
Apply suggestions from code review
hadley Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ jobs:
- {os: macos-latest, r: 'release'}

- {os: windows-latest, r: 'release'}
# Use 3.6 to trigger usage of RTools35
- {os: windows-latest, r: '3.6'}
apalacio9502 marked this conversation as resolved.
Show resolved Hide resolved
# use 4.1 to check with rtools40's older compiler
- {os: windows-latest, r: '4.1'}

Expand Down
5 changes: 3 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ License: MIT + file LICENSE
URL: https://bigrquery.r-dbi.org, https://github.com/r-dbi/bigrquery
BugReports: https://github.com/r-dbi/bigrquery/issues
Depends:
R (>= 3.6)
R (>= 4.0)
Imports:
bit64,
brio,
Expand All @@ -29,7 +29,8 @@ Imports:
prettyunits,
rlang (>= 1.1.0),
tibble
Suggests:
Suggests:
arrow,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider trying nanoparquet instead? It's very new but has no dependencies, so we could use it from imports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that nanoparquet is a good alternative because it has no dependencies. The only downside I see is that you would have to write to disk to read the raw data, as it lacks an output stream buffer implementation.

If you think the advantages outweigh the disadvantages, I could start testing and adapting the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't think about that, but I suspect it's still worthwhile given the lighter dependencies. Do you mind filing a nanoparquet issue to add a stream buffer output?

Copy link
Contributor Author

@apalacio9502 apalacio9502 May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will begin implementing nanoparquet. If BufferedOutputStream is added in the future, an update will be necessary.

r-lib/nanoparquet#31

blob,
covr,
dbplyr (>= 2.4.0),
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ importFrom(httr,DELETE)
importFrom(httr,GET)
importFrom(httr,PATCH)
importFrom(httr,POST)
importFrom(httr,PUT)
importFrom(httr,add_headers)
importFrom(httr,config)
importFrom(httr,content)
importFrom(httr,headers)
importFrom(httr,http_status)
importFrom(httr,parse_media)
importFrom(httr,status_code)
Expand Down
9 changes: 9 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# bigrquery (development version)

## Significant improvements
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave these headings off? We add them as part of the final release process?


* The `bq_perform_upload()` function now allows users to choose the transmission
format (JSON or PARQUET) for data sent to BigQuery (#608).

## Dependency changes

* R 3.6 is no longer explicitly supported or tested. Our general practice is to support the current release (4.4), devel, and the 4 previous versions of R (4.3, 4.2, 4.1, 4.0).

# bigrquery 1.5.1

* Forward compatibility with upcoming dbplyr release (#601).
Expand Down
48 changes: 37 additions & 11 deletions R/bq-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ bq_perform_extract <- function(x,
#' @export
#' @name api-perform
#' @param values Data frame of values to insert.
#' @param source_format The format of the data files:
#' * For newline-delimited JSON, specify "NEWLINE_DELIMITED_JSON".
#' * For parquet, specify "PARQUET" (Arrow package is required).
#' @param create_disposition Specifies whether the job is allowed to create
#' new tables.
#'
Expand All @@ -110,6 +113,7 @@ bq_perform_extract <- function(x,
#' 'duplicate' error is returned in the job result.
bq_perform_upload <- function(x, values,
fields = NULL,
source_format = c("NEWLINE_DELIMITED_JSON", "PARQUET"),
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_EMPTY",
...,
Expand All @@ -121,12 +125,13 @@ bq_perform_upload <- function(x, values,
cli::cli_abort("{.arg values} must be a data frame.")
}
fields <- as_bq_fields(fields)
arg_match(source_format)
check_string(create_disposition)
check_string(write_disposition)
check_string(billing)

load <- list(
sourceFormat = unbox("NEWLINE_DELIMITED_JSON"),
sourceFormat = unbox(source_format),
destinationTable = tableReference(x),
createDisposition = unbox(create_disposition),
writeDisposition = unbox(write_disposition)
Expand All @@ -139,22 +144,30 @@ bq_perform_upload <- function(x, values,
load$autodetect <- unbox(TRUE)
}

config <- list(configuration = list(load = load))
config <- bq_body(config, ...)
config_part <- part(
c("Content-type" = "application/json; charset=UTF-8"),
jsonlite::toJSON(config, pretty = TRUE)
metadata <- list(configuration = list(load = load))
metadata <- bq_body(metadata, ...)
metadata <- list(
"type" = "application/json; charset=UTF-8",
"content" = jsonlite::toJSON(metadata, pretty = TRUE)
)

data_part <- part(
c("Content-type" = "application/json; charset=UTF-8"),
export_json(values)
)
if (source_format == "NEWLINE_DELIMITED_JSON") {
media <- list(
"type" = "application/json; charset=UTF-8",
"content" = export_json(values)
)
} else {
media <- list(
"type" = "application/vnd.apache.parquet",
"content" = export_parquet(values)
)
}

url <- bq_path(billing, jobs = "")
res <- bq_upload(
url,
parts = c(config_part, data_part),
metadata,
media,
query = list(fields = "jobReference")
)
as_bq_job(res$jobReference)
Expand Down Expand Up @@ -186,6 +199,19 @@ export_json <- function(values) {
rawToChar(rawConnectionValue(con))
}

# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet?hl=es-419
export_parquet <- function(values) {

check_installed("arrow", "for source_format = `PARQUET`")

con <- arrow::BufferOutputStream$create()
defer(con$close())
arrow::write_parquet(values, con)

as.raw(arrow::buffer(con))

}

#' @export
#' @name api-perform
#' @param source_uris The fully-qualified URIs that point to your data in
Expand Down
77 changes: 28 additions & 49 deletions R/bq-request.R
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,36 @@ bq_patch <- function(url, body, ..., query = NULL, token = bq_token()) {
process_request(req)
}

#' @importFrom httr POST add_headers config
bq_upload <- function(url, parts, ..., query = list(), token = bq_token()) {
url <- paste0(upload_url, url)
req <- POST_multipart_related(
url,
parts = parts,
token,
#' @importFrom httr POST PUT add_headers headers config status_code
# https://cloud.google.com/bigquery/docs/reference/api-uploads
bq_upload <- function(url, metadata, media, query = list(), token = bq_token()) {

query <- utils::modifyList(list(fields = "jobReference",uploadType = "resumable"), query)
config <- add_headers("Content-Type" = metadata[["type"]])

req <- POST(
paste0(upload_url, url),
body = metadata[["content"]],
httr::user_agent(bq_ua()),
...,
query = prepare_bq_query(query)
token,
config,
query = query
)

if (status_code(req) == 200) {

config <- add_headers("Content-Type" = media[["type"]])

req <- PUT(
headers(req)$location,
body = media[["content"]],
httr::user_agent(bq_ua()),
token,
config
)

}

process_request(req)
}

Expand Down Expand Up @@ -242,43 +261,3 @@ gargle_abort <- function(reason, message, status, call = caller_env()) {
cli::cli_abort(message, class = class, call = call)
}

# Multipart/related ------------------------------------------------------------


# http://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
POST_multipart_related <- function(url, config = NULL, parts = NULL,
apalacio9502 marked this conversation as resolved.
Show resolved Hide resolved
query = list(), ...,
boundary = random_boundary(),
handle = NULL) {
if (is.null(config)) config <- config()

sep <- paste0("\n--", boundary, "\n")
end <- paste0("\n--", boundary, "--\n")

body <- paste0(sep, paste0(parts, collapse = sep), end)

type <- paste0("multipart/related; boundary=", boundary)
config <- c(config, add_headers("Content-Type" = type))

query <- utils::modifyList(list(uploadType = "multipart"), query)

POST(url, config = config, body = body, query = query, ..., handle = handle)
}

part <- function(headers, body) {
if (length(headers) == 0) {
header <- "\n"
} else {
header <- paste0(names(headers), ": ", headers, "\n", collapse = "")
}
body <- paste0(body, collapse = "\n")

paste0(header, "\n", body)
}

random_boundary <- function() {
valid <- c(LETTERS, letters, 0:9) # , "'", "(", ")", "+", ",", "-", ".", "/",
# ":", "?")
paste0(sample(valid, 50, replace = TRUE), collapse = "")
}

21 changes: 11 additions & 10 deletions man/api-perform.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading