diff --git a/src/function/Cargo.toml b/src/function/Cargo.toml index 0d841185..41aa6e15 100644 --- a/src/function/Cargo.toml +++ b/src/function/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "cloud-function" +name = "function" version = "0.1.0" description = "The generic cloud functions for serverless computation." authors = [ "Gang Liao " ] diff --git a/src/function/src/aws/lambda.rs b/src/function/src/aws/lambda.rs index 0c621970..fdf7719b 100644 --- a/src/function/src/aws/lambda.rs +++ b/src/function/src/aws/lambda.rs @@ -99,7 +99,12 @@ fn invoke_async_functions(ctx: &ExecutionContext, batches: &mut Vec loop { let request = InvokeAsyncRequest { function_name: next_func.clone(), - invoke_args: Payload::to_bytes(&[batches.pop().unwrap()], uuid.clone()), + invoke_args: Payload::to_vec( + &[batches.pop().unwrap()], + uuid.clone(), + Encoding::default(), + ) + .into(), }; if let Ok(reponse) = block_on(client.invoke_async(request)) { @@ -204,7 +209,7 @@ async fn payload_handler(ctx: &mut ExecutionContext, event: Value) -> Result Result { diff --git a/src/runtime/src/context.rs b/src/runtime/src/context.rs index ea767aff..1399d486 100644 --- a/src/runtime/src/context.rs +++ b/src/runtime/src/context.rs @@ -145,7 +145,7 @@ impl ExecutionContext { Encoding::Snappy | Encoding::Lz4 | Encoding::Zstd => { let encoded: Vec = serde_json::to_vec(&self).unwrap(); serde_json::to_string(&CloudEnvironment { - context: encoding.compress(&encoded), + context: encoding.compress(encoded), encoding, }) .unwrap() diff --git a/src/runtime/src/encoding.rs b/src/runtime/src/encoding.rs index 255b6940..ff15b076 100644 --- a/src/runtime/src/encoding.rs +++ b/src/runtime/src/encoding.rs @@ -51,7 +51,7 @@ impl Default for Encoding { impl Encoding { /// Compress data - pub fn compress(&self, s: &[u8]) -> Vec { + pub fn compress(&self, s: Vec) -> Vec { match *self { Encoding::Snappy => { let mut encoder = snap::raw::Encoder::new(); @@ -59,9 +59,8 @@ impl Encoding { } Encoding::Lz4 => lz4::block::compress(&s, None, true).unwrap(), Encoding::Zstd => zstd::block::compress(&s, 3).unwrap(), - _ => { - unimplemented!(); - } + Encoding::None => s, + _ => unimplemented!(), } } @@ -149,7 +148,7 @@ mod tests { let json = serde_json::to_string(&plan).unwrap(); let now = Instant::now(); - let en_json = en.compress(&json.as_bytes()); + let en_json = en.compress(json.as_bytes().to_vec()); println!("Compression time: {} μs", now.elapsed().as_micros()); let now = Instant::now(); diff --git a/src/runtime/src/executor/mod.rs b/src/runtime/src/executor/mod.rs index 2f113f95..eef4b4f1 100644 --- a/src/runtime/src/executor/mod.rs +++ b/src/runtime/src/executor/mod.rs @@ -27,6 +27,7 @@ use crate::config::GLOBALS as globals; use crate::context::CloudFunction; use crate::context::ExecutionContext; +use crate::encoding::Encoding; use crate::error::{Result, SquirtleError}; use crate::payload::{Payload, Uuid}; use arrow::record_batch::RecordBatch; @@ -123,7 +124,11 @@ pub trait Executor { assert_eq!(1, output_partitions.len()); assert_eq!(1, output_partitions[0].len()); - Ok(Payload::to_value(&output_partitions[0], Uuid::default())) + Ok(Payload::to_value( + &output_partitions[0], + Uuid::default(), + Encoding::default(), + )) } } diff --git a/src/runtime/src/payload.rs b/src/runtime/src/payload.rs index d0ce259c..f20338b6 100644 --- a/src/runtime/src/payload.rs +++ b/src/runtime/src/payload.rs @@ -20,6 +20,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_flight::utils::{flight_data_from_arrow_batch, flight_data_to_arrow_batch}; use arrow_flight::FlightData; +use rayon::prelude::*; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::sync::Arc; @@ -90,7 +91,7 @@ pub struct Uuid { } /// Arrow Flight Data format -#[derive(Default, Debug, Deserialize, Serialize)] +#[derive(Default, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct DataFrame { /// Arrow Flight Data's header. #[serde(with = "serde_bytes")] @@ -107,8 +108,7 @@ pub struct DataFrame { #[derive(Debug, Deserialize, Serialize, PartialEq)] pub struct Payload { /// The data batches in the payload. - #[serde(with = "serde_bytes")] - pub data: Vec, + pub data: Vec, /// The subplan's schema. schema: SchemaRef, /// The query's uuid. @@ -134,20 +134,20 @@ impl Payload { pub fn to_batch(event: Value) -> (Vec, Uuid) { let payload: Payload = serde_json::from_value(event).unwrap(); let uuid = payload.uuid.clone(); - let mut data_frames = unmarshal(&payload); - let nums = data_frames.len(); + let schema = payload.schema.clone(); + let data_frames = unmarshal(payload); ( - (0..nums) - .map(|_| { - let data = data_frames.pop().unwrap(); + data_frames + .into_par_iter() + .map(|d| { flight_data_to_arrow_batch( &FlightData { - data_body: data.body, - data_header: data.header, + data_body: d.body, + data_header: d.header, app_metadata: vec![], flight_descriptor: None, }, - payload.schema.clone(), + schema.clone(), &[], ) .unwrap() @@ -158,108 +158,64 @@ impl Payload { } /// Convert record batch to payload for network transmission. - pub fn to_value(batches: &[RecordBatch], uuid: Uuid) -> Value { + pub fn to_value(batches: &[RecordBatch], uuid: Uuid, encoding: Encoding) -> Value { let options = arrow::ipc::writer::IpcWriteOptions::default(); - - let data_frames = (0..batches.len()) - .map(|i| { - let (_, flight_data) = flight_data_from_arrow_batch(&batches[i], &options); + let data_frames = batches + .par_iter() + .map(|b| { + let (_, flight_data) = flight_data_from_arrow_batch(&b, &options); DataFrame { - header: flight_data.data_header, - body: flight_data.data_body, + header: encoding.compress(flight_data.data_header), + body: encoding.compress(flight_data.data_body), } }) .collect(); - marshal2value(&data_frames, batches[0].schema(), uuid, Encoding::default()) + serde_json::to_value(&Payload { + data: data_frames, + schema: batches[0].schema(), + uuid, + encoding, + }) + .unwrap() } /// Convert record batch to payload for network transmission. - pub fn to_bytes(batches: &[RecordBatch], uuid: Uuid) -> bytes::Bytes { + pub fn to_vec(batches: &[RecordBatch], uuid: Uuid, encoding: Encoding) -> Vec { let options = arrow::ipc::writer::IpcWriteOptions::default(); - - let data_frames = (0..batches.len()) - .map(|i| { - let (_, flight_data) = flight_data_from_arrow_batch(&batches[i], &options); + let data_frames = batches + .par_iter() + .map(|b| { + let (_, flight_data) = flight_data_from_arrow_batch(&b, &options); DataFrame { - header: flight_data.data_header, - body: flight_data.data_body, + header: encoding.compress(flight_data.data_header), + body: encoding.compress(flight_data.data_body), } }) .collect(); - marshal2bytes(&data_frames, batches[0].schema(), uuid, Encoding::default()) - } -} - -/// Serialize `Payload` in cloud functions. -pub fn marshal2value( - data: &Vec, - schema: SchemaRef, - uuid: Uuid, - encoding: Encoding, -) -> Value { - match encoding { - Encoding::Snappy | Encoding::Lz4 | Encoding::Zstd => { - let encoded: Vec = serde_json::to_vec(&data).unwrap(); - serde_json::to_value(&Payload { - data: encoding.compress(&encoded), - schema, - uuid, - encoding, - }) - .unwrap() - } - Encoding::None => serde_json::to_value(&Payload { - data: serde_json::to_vec(&data).unwrap(), - schema, - uuid, - encoding, - }) - .unwrap(), - _ => unimplemented!(), - } -} - -/// Serialize `Payload` in cloud functions. -pub fn marshal2bytes( - data: &Vec, - schema: SchemaRef, - uuid: Uuid, - encoding: Encoding, -) -> bytes::Bytes { - match encoding { - Encoding::Snappy | Encoding::Lz4 | Encoding::Zstd => { - let encoded: Vec = serde_json::to_vec(&data).unwrap(); - serde_json::to_vec(&Payload { - data: encoding.compress(&encoded), - schema, - uuid, - encoding, - }) - .unwrap() - .into() - } - Encoding::None => serde_json::to_vec(&Payload { - data: serde_json::to_vec(&data).unwrap(), - schema, + serde_json::to_vec(&Payload { + data: data_frames, + schema: batches[0].schema(), uuid, encoding, }) .unwrap() - .into(), - _ => unimplemented!(), } } /// Deserialize `DataFrame` from cloud functions. -pub fn unmarshal(payload: &Payload) -> Vec { +pub fn unmarshal(payload: Payload) -> Vec { match payload.encoding { - Encoding::Snappy | Encoding::Lz4 | Encoding::Zstd => { - let encoded = payload.encoding.decompress(&payload.data); - serde_json::from_slice(&encoded).unwrap() - } - Encoding::None => serde_json::from_slice(&payload.data).unwrap(), + Encoding::Snappy | Encoding::Lz4 | Encoding::Zstd => payload + .data + .par_iter() + .map(|d| DataFrame { + header: payload.encoding.decompress(&d.header), + body: payload.encoding.decompress(&d.body), + }) + .collect(), + Encoding::None => payload.data, _ => unimplemented!(), } } @@ -268,12 +224,11 @@ pub fn unmarshal(payload: &Payload) -> Vec { mod tests { use super::*; use crate::error::Result; + use crate::executor::{Executor, LambdaExecutor}; use arrow::array::{Array, StructArray}; use arrow::csv; use arrow::datatypes::{DataType, Field, Schema}; use arrow::json; - use std::mem; - use std::slice; use std::sync::Arc; use std::time::Instant; @@ -326,7 +281,7 @@ mod tests { assert_eq!(1856, flight_data_size); } - fn init_batches() -> RecordBatch { + fn init_batches() -> Vec { let schema = Arc::new(Schema::new(vec![ Field::new("tripduration", DataType::Utf8, false), Field::new("starttime", DataType::Utf8, false), @@ -345,25 +300,42 @@ mod tests { Field::new("gender", DataType::Int8, false), ])); + let batch_size = 21275; let records: &[u8] = include_str!("../../test/data/JC-202011-citibike-tripdata.csv").as_bytes(); - let mut reader = csv::Reader::new(records, schema, true, None, 21275, None, None); - reader.next().unwrap().unwrap() + let mut reader = csv::Reader::new(records, schema, true, None, batch_size, None, None); + + let mut batches = vec![]; + while let Some(Ok(batch)) = reader.next() { + batches.push(batch); + } + batches } - #[test] - fn flight_data_compression_ratio_2() { - let batch = init_batches(); + #[tokio::test] + async fn flight_data_compression_ratio_2() -> Result<()> { + let batches = init_batches(); // Arrow RecordBatch (in-memory) - let size: usize = batch - .columns() - .iter() - .map(|a| a.get_array_memory_size()) + let size: usize = batches + .par_iter() + .map(|batch| { + batch + .columns() + .par_iter() + .map(|a| a.get_array_memory_size()) + .sum::() + }) .sum(); assert_eq!(4661248, size); println!("Arrow RecordBatch data (in-memory): {}", size); + let batches = LambdaExecutor::coalesce_batches(vec![batches], size).await?; + assert_eq!(1, batches.len()); + assert_eq!(1, batches[0].len()); + + let batch = &batches[0][0]; + // Option: Write the record batch out as JSON let buf = Vec::new(); let mut writer = json::LineDelimitedWriter::new(buf); @@ -379,9 +351,9 @@ mod tests { assert_eq!(21275, struct_array.len()); // return the total number of bytes of memory occupied by the buffers owned by // this array. - assert_eq!(4659712, struct_array.get_buffer_memory_size()); + assert_eq!(3412864, struct_array.get_buffer_memory_size()); // return the total number of bytes of memory occupied physically by this array. - assert_eq!(4661048, struct_array.get_array_memory_size()); + assert_eq!(3413960, struct_array.get_array_memory_size()); println!( "Arrow Struct Array data: {}", struct_array.get_array_memory_size() @@ -405,11 +377,12 @@ mod tests { // Option: Compress Arrow Flight data { for en in [Encoding::Snappy, Encoding::Lz4, Encoding::Zstd].iter() { - let now = Instant::now(); - let (en_header, en_body) = ( - en.compress(&flight_data.data_header), - en.compress(&flight_data.data_body), + let (h, b) = ( + flight_data.data_header.clone(), + flight_data.data_body.clone(), ); + let now = Instant::now(); + let (en_header, en_body) = (en.compress(h), en.compress(b)); let en_flight_data_size = en_header.len() + en_body.len(); println!("Compression time: {} ms", now.elapsed().as_millis()); @@ -428,18 +401,32 @@ mod tests { assert_eq!(flight_data.data_body, de_body); } } + + Ok(()) } #[tokio::test] async fn serde_payload() -> Result<()> { - let batches = vec![init_batches()]; + let batches = init_batches(); let mut uuid_builder = UuidBuilder::new("SX72HzqFz1Qij4bP-00-2021-01-28T19:27:50.298504836", 10); let uuid = uuid_builder.next(); - let value = Payload::to_value(&batches, uuid.clone()); + let now = Instant::now(); + let value = Payload::to_value(&batches, uuid.clone(), Encoding::default()); + println!( + "serde payload to value (with compression) - time: {} ms", + now.elapsed().as_millis() + ); + let payload1: Payload = serde_json::from_value(value.clone())?; + let now = Instant::now(); let (de_batches, de_uuid) = Payload::to_batch(value); + println!( + "serde value to batch (with decompression) - time: {} ms", + now.elapsed().as_millis() + ); + { assert_eq!(batches.len(), de_batches.len()); assert_eq!(batches[0].schema(), de_batches[0].schema()); @@ -449,110 +436,25 @@ mod tests { assert_eq!(uuid, de_uuid); } - let bytes = Payload::to_bytes(&batches, uuid); - let payload2: Payload = serde_json::from_slice(&bytes)?; - assert_eq!(payload1, payload2); - - Ok(()) - } - - #[tokio::test] - async fn transmute_data_frames() -> Result<()> { - #[repr(packed)] - pub struct DataFrameStruct { - /// Arrow Flight Data's header. - header: Vec, - /// Arrow Flight Data's body. - body: Vec, - } - - let batch = init_batches(); - let schema = batch.schema(); - let batches = vec![batch.clone(), batch.clone(), batch]; - let mut uuid_builder = - UuidBuilder::new("SX72HzqFz1Qij4bP-00-2021-01-28T19:27:50.298504836", 10); - let uuid = uuid_builder.next(); - - let options = arrow::ipc::writer::IpcWriteOptions::default(); - let data_frames = (0..batches.len()) - .map(|i| { - let (_, flight_data) = flight_data_from_arrow_batch(&batches[i], &options); - DataFrameStruct { - header: flight_data.data_header, - body: flight_data.data_body, - } - }) - .collect::>(); - unsafe { - println!( - "transmute data - raw data: {}", - data_frames[0].header.len() + data_frames[0].body.len(), - ); - } - - let p: *const DataFrameStruct = &data_frames[0]; - let p: *const u8 = p as *const u8; - let d: &[u8] = unsafe { slice::from_raw_parts(p, mem::size_of::()) }; - - let (head, body, _tail) = unsafe { d.align_to::() }; - assert!(head.is_empty(), "Data was not aligned"); - let my_struct = &body[0]; - - unsafe { - assert_eq!(data_frames[0].header.len(), (*my_struct).header.len()); - assert_eq!(data_frames[0].header, (*my_struct).header); - assert_eq!(data_frames[0].body.len(), (*my_struct).body.len()); - assert_eq!(data_frames[0].body, (*my_struct).body); - } - - let encoding = Encoding::Zstd; - // compress let now = Instant::now(); - let event: bytes::Bytes = serde_json::to_vec(&Payload { - data: encoding.compress(&d), - uuid: uuid.clone(), - encoding: encoding.clone(), - schema, - }) - .unwrap() - .into(); - println!( - "transmute data - compression time: {} us", - now.elapsed().as_micros() - ); + let bytes = Payload::to_vec(&batches, uuid, Encoding::default()); println!( - "transmute data - compressed data: {}, type: {:?}", - event.len(), - encoding + "serde payload to bytes (with compression) - time: {} ms, size: {} bytes", + now.elapsed().as_millis(), + bytes.len() ); - // decompress - let now = Instant::now(); - let payload: Payload = serde_json::from_slice(&event).unwrap(); - let de_uuid = payload.uuid.clone(); - let encoded = payload.encoding.decompress(&payload.data); + let payload2: Payload = serde_json::from_slice(&bytes)?; + assert_eq!(payload1, payload2); - let (head, body, _tail) = unsafe { encoded.align_to::() }; + let now = Instant::now(); + let bytes = Encoding::Zstd.compress(bytes); println!( - "transmute data - decompression time: {} us", - now.elapsed().as_micros() + "serde Json bytes - time: {} ms, size: {} bytes", + now.elapsed().as_millis(), + bytes.len() ); - let de_struct = &body[0]; - assert!(head.is_empty(), "Data was not aligned"); - - unsafe { - assert_eq!(data_frames[0].header.len(), (*de_struct).header.len()); - assert_eq!(data_frames[0].header, (*de_struct).header); - assert_eq!(data_frames[0].body.len(), (*de_struct).body.len()); - assert_eq!(data_frames[0].body, (*de_struct).body); - assert_eq!(uuid, de_uuid); - println!( - "transmute data - decompress raw data: {}", - (*de_struct).header.len() + (*de_struct).body.len(), - ); - } - Ok(()) } }