Skip to content

Commit

Permalink
feat: export/import DataFrame as raw vector (#1072)
Browse files Browse the repository at this point in the history
  • Loading branch information
eitsupi authored May 4, 2024
1 parent 7f2994f commit 32a97c6
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 61 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,5 @@ Collate:
'zzz.R'
Config/rextendr/version: 0.3.1
VignetteBuilder: knitr
Config/polars/LibVersion: 0.39.2
Config/polars/LibVersion: 0.39.3
Config/polars/RustToolchainVersion: nightly-2024-04-15
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Polars R Package (development version)

### New features

- `pl$read_ipc()` can read a raw vector of Apache Arrow IPC file (#1072).
- New method `<DataFrame>$to_raw_ipc()` to serialize a DataFrame to a raw vector
of Apache Arrow IPC file format (#1072).

## Polars R Package 0.16.3

### New features
Expand Down
2 changes: 2 additions & 0 deletions R/dataframe__frame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,8 @@ DataFrame_write_csv = function(
#' This functionality is considered **unstable**.
#' It may be changed at any point without it being considered a breaking change.
#' @rdname IO_write_ipc
#' @seealso
#' - [`<DataFrame>$to_raw_ipc()`][DataFrame_to_raw_ipc]
#' @examples
#' dat = pl$DataFrame(mtcars)
#'
Expand Down
6 changes: 4 additions & 2 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ test_rbackgroundhandler <- function(lambda, arg) .Call(wrap__test_rbackgroundhan

test_rthreadhandle <- function() .Call(wrap__test_rthreadhandle)

test_serde_df <- function(df) .Call(wrap__test_serde_df, df)

internal_wrap_e <- function(robj, str_to_lit) .Call(wrap__internal_wrap_e, robj, str_to_lit)

create_col <- function(name) .Call(wrap__create_col, name)
Expand Down Expand Up @@ -230,6 +228,10 @@ RPolarsDataFrame$write_csv <- function(file, include_bom, include_header, separa

RPolarsDataFrame$write_ipc <- function(file, compression, future) .Call(wrap__RPolarsDataFrame__write_ipc, self, file, compression, future)

RPolarsDataFrame$to_raw_ipc <- function(compression, future) .Call(wrap__RPolarsDataFrame__to_raw_ipc, self, compression, future)

RPolarsDataFrame$from_raw_ipc <- function(bits, n_rows, row_name, row_index, memory_map) .Call(wrap__RPolarsDataFrame__from_raw_ipc, bits, n_rows, row_name, row_index, memory_map)

RPolarsDataFrame$write_parquet <- function(file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit) .Call(wrap__RPolarsDataFrame__write_parquet, self, file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit)

RPolarsDataFrame$write_json <- function(file, pretty, row_oriented) .Call(wrap__RPolarsDataFrame__write_json, self, file, pretty, row_oriented)
Expand Down
62 changes: 57 additions & 5 deletions R/io_ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pl_scan_ipc = function(
#'
#' @inherit pl_read_csv return
#' @inheritParams pl_scan_ipc
#' @param source A single character or a raw vector of Apache Arrow IPC file.
#' You can use globbing with `*` to scan/read multiple files in the same directory
#' (see examples).
#' @rdname IO_read_ipc
#' @examplesIf requireNamespace("arrow", quietly = TRUE) && arrow::arrow_with_dataset()
#' temp_dir = tempfile()
Expand All @@ -73,6 +76,15 @@ pl_scan_ipc = function(
#' pl$read_ipc(
#' file.path(temp_dir, "**/*.arrow")
#' )
#'
#' # Read a raw vector
#' arrow::arrow_table(
#' foo = 1:5,
#' bar = 6:10,
#' ham = letters[1:5]
#' ) |>
#' arrow::write_to_raw(format = "file") |>
#' pl$read_ipc()
pl_read_ipc = function(
source,
...,
Expand All @@ -82,9 +94,49 @@ pl_read_ipc = function(
row_index_offset = 0L,
rechunk = FALSE,
cache = TRUE) {
.args = as.list(environment())
result({
do.call(pl$scan_ipc, .args)$collect()
}) |>
unwrap("in pl$read_ipc():")
uw = function(res) unwrap(res, "in pl$read_ipc():")

if (isTRUE(is.raw(source))) {
.pr$DataFrame$from_raw_ipc(
source,
n_rows,
row_index_name,
row_index_offset,
memory_map
) |>
uw()
} else {
.args = as.list(environment())
result(do.call(pl$scan_ipc, .args)$collect()) |>
uw()
}
}


#' Write Arrow IPC data to a raw vector
#'
#' @inheritParams DataFrame_write_ipc
#' @return A raw vector
#' @seealso
#' - [`<DataFrame>$write_ipc()`][DataFrame_write_ipc]
#' @examples
#' df = pl$DataFrame(
#' foo = 1:5,
#' bar = 6:10,
#' ham = letters[1:5]
#' )
#'
#' raw_ipc = df$to_raw_ipc()
#'
#' pl$read_ipc(raw_ipc)
#'
#' if (require("arrow", quietly = TRUE)) {
#' arrow::read_ipc_file(raw_ipc, as_data_frame = FALSE)
#' }
DataFrame_to_raw_ipc = function(
compression = c("uncompressed", "zstd", "lz4"),
...,
future = FALSE) {
.pr$DataFrame$to_raw_ipc(self, compression, future) |>
unwrap("in $to_raw_ipc():")
}
51 changes: 51 additions & 0 deletions man/DataFrame_to_raw_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions man/IO_read_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions man/IO_write_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "r-polars"
version = "0.39.2"
version = "0.39.3"
edition = "2021"
rust-version = "1.76.0"
publish = false
Expand Down
42 changes: 24 additions & 18 deletions src/rust/src/rbackground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use extendr_api::{
use flume::{bounded, Sender};
use ipc_channel::ipc;
use once_cell::sync::Lazy;
use polars::prelude::Series as PSeries;
use polars::prelude as pl;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::thread;
Expand Down Expand Up @@ -93,31 +93,45 @@ pub fn deserialize_robj(bits: Vec<u8>) -> RResult<Robj> {
.when("deserializing an R object")
}

pub fn serialize_dataframe(dataframe: &mut polars::prelude::DataFrame) -> RResult<Vec<u8>> {
pub fn serialize_dataframe(
dataframe: &mut polars::prelude::DataFrame,
compression: Option<pl::IpcCompression>,
future: bool,
) -> RResult<Vec<u8>> {
use polars::io::SerWriter;

let mut dump = Vec::new();
polars::io::ipc::IpcWriter::new(&mut dump)
.with_compression(compression)
.with_pl_flavor(future)
.finish(dataframe)
.map_err(polars_to_rpolars_err)?;
Ok(dump)
}

pub fn deserialize_dataframe(bits: &[u8]) -> RResult<polars::prelude::DataFrame> {
pub fn deserialize_dataframe(
bits: &[u8],
n_rows: Option<usize>,
row_index: Option<polars::io::RowIndex>,
memory_map: bool,
) -> RResult<polars::prelude::DataFrame> {
use polars::io::SerReader;

polars::io::ipc::IpcReader::new(std::io::Cursor::new(bits))
.with_n_rows(n_rows)
.with_row_index(row_index)
.memory_mapped(memory_map)
.finish()
.map_err(polars_to_rpolars_err)
}

pub fn serialize_series(series: PSeries) -> RResult<Vec<u8>> {
serialize_dataframe(&mut std::iter::once(series).collect())
pub fn serialize_series(series: pl::Series) -> RResult<Vec<u8>> {
serialize_dataframe(&mut std::iter::once(series).collect(), None, true)
}

pub fn deserialize_series(bits: &[u8]) -> RResult<PSeries> {
let tn = std::any::type_name::<PSeries>();
deserialize_dataframe(bits)?
pub fn deserialize_series(bits: &[u8]) -> RResult<pl::Series> {
let tn = std::any::type_name::<pl::Series>();
deserialize_dataframe(bits, None, None, true)?
.get_columns()
.split_first()
.ok_or(RPolarsErr::new())
Expand Down Expand Up @@ -480,8 +494,8 @@ impl RBackgroundPool {
pub fn rmap_series(
&self,
raw_func: Vec<u8>,
series: PSeries,
) -> RResult<impl FnOnce() -> RResult<PSeries> + '_> {
series: pl::Series,
) -> RResult<impl FnOnce() -> RResult<pl::Series> + '_> {
#[cfg(feature = "rpolars_debug_print")]
dbg!("rmap_series");
let handler = self.lease()?;
Expand Down Expand Up @@ -579,13 +593,6 @@ pub fn test_rthreadhandle() -> RPolarsRThreadHandle<RResult<RPolarsDataFrame>> {
})
}

#[extendr]
pub fn test_serde_df(df: &RPolarsDataFrame) -> RResult<RPolarsDataFrame> {
let x = serialize_dataframe(&mut df.0.clone())?;
let df2 = deserialize_dataframe(x.as_slice())?;
Ok(RPolarsDataFrame(df2))
}

extendr_module! {
mod rbackground;
impl RPolarsRThreadHandle<RResult<RPolarsDataFrame>>;
Expand All @@ -595,5 +602,4 @@ extendr_module! {
fn handle_background_request;
fn test_rbackgroundhandler;
fn test_rthreadhandle;
fn test_serde_df;
}
27 changes: 27 additions & 0 deletions src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,33 @@ impl RPolarsDataFrame {
.map_err(polars_to_rpolars_err)
}

pub fn to_raw_ipc(&self, compression: Robj, future: Robj) -> RResult<Vec<u8>> {
let compression = rdatatype::new_ipc_compression(compression)?;
let future = robj_to!(bool, future)?;

crate::rbackground::serialize_dataframe(&mut self.0.clone(), compression, future)
}

pub fn from_raw_ipc(
bits: Robj,
n_rows: Robj,
row_name: Robj,
row_index: Robj,
memory_map: Robj,
) -> RResult<Self> {
let bits = robj_to!(Raw, bits)?;
let n_rows = robj_to!(Option, usize, n_rows)?;
let row_index = robj_to!(Option, String, row_name)?
.map(|name| {
robj_to!(u32, row_index).map(|offset| polars::io::RowIndex { name, offset })
})
.transpose()?;
let memory_map = robj_to!(bool, memory_map)?;
let df = crate::rbackground::deserialize_dataframe(&bits, n_rows, row_index, memory_map)?;

Ok(RPolarsDataFrame(df))
}

pub fn write_parquet(
&self,
file: Robj,
Expand Down
6 changes: 3 additions & 3 deletions src/rust/src/rdataframe/read_ipc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::lazy::dataframe::RPolarsLazyFrame as RLazyFrame;
use crate::lazy::dataframe::RPolarsLazyFrame;
use crate::robj_to;
use crate::rpolarserr::RResult;
use extendr_api::prelude::*;
Expand All @@ -14,7 +14,7 @@ pub fn import_arrow_ipc(
row_name: Robj,
row_index: Robj,
memory_map: Robj,
) -> RResult<RLazyFrame> {
) -> RResult<RPolarsLazyFrame> {
let args = ScanArgsIpc {
n_rows: robj_to!(Option, usize, n_rows)?,
cache: robj_to!(bool, cache)?,
Expand All @@ -27,7 +27,7 @@ pub fn import_arrow_ipc(
};
let lf = LazyFrame::scan_ipc(robj_to!(String, path)?, args)
.map_err(crate::rpolarserr::polars_to_rpolars_err)?;
Ok(RLazyFrame(lf))
Ok(RPolarsLazyFrame(lf))
}

extendr_module! {
Expand Down
Loading

0 comments on commit 32a97c6

Please sign in to comment.