Skip to content

Commit

Permalink
Sync Action attributes with delta
Browse files Browse the repository at this point in the history
  • Loading branch information
fvaleye committed Aug 12, 2021
1 parent ca667a2 commit 15030a6
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 86 deletions.
6 changes: 3 additions & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ struct RawDeltaTableMetaData {
#[pyo3(get)]
id: String,
#[pyo3(get)]
name: Option<String>,
name: String,
#[pyo3(get)]
description: Option<String>,
description: String,
#[pyo3(get)]
partition_columns: Vec<String>,
#[pyo3(get)]
created_time: deltalake::DeltaDataTypeTimestamp,
created_time: Option<deltalake::DeltaDataTypeTimestamp>,
#[pyo3(get)]
configuration: HashMap<String, Option<String>>,
}
Expand Down
101 changes: 50 additions & 51 deletions rust/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub struct Add {
/// that would not affect the final results.
pub data_change: bool,
/// Contains statistics (e.g., count, min/max values for columns) about the data in this file
pub stats: Option<String>,
pub stats: String,
/// Contains statistics (e.g., count, min/max values for columns) about the data in this file in
/// raw parquet format. This field needs to be written when statistics are available and the
/// table property: delta.checkpoint.writeStatsAsStruct is set to true.
Expand All @@ -178,7 +178,7 @@ pub struct Add {
#[serde(skip_serializing, skip_deserializing)]
pub stats_parsed: Option<parquet::record::Row>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,
pub tags: HashMap<String, Option<String>>,
}

impl Add {
Expand Down Expand Up @@ -245,18 +245,18 @@ impl Add {
estr,
))
})?;
re.tags = Some(tags);
re.tags = tags;
}
_ => {
re.tags = None;
re.tags = HashMap::new();
}
},
"stats" => match record.get_string(i) {
Ok(stats) => {
re.stats = Some(stats.clone());
re.stats = stats.clone();
}
_ => {
re.stats = None;
re.stats = String::new();
}
},
"stats_parsed" => match record.get_group(i) {
Expand All @@ -283,9 +283,7 @@ impl Add {
/// Returns the serde_json representation of stats contained in the action if present.
/// Since stats are defined as optional in the protocol, this may be None.
pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
self.stats
.as_ref()
.map_or(Ok(None), |s| serde_json::from_str(s))
serde_json::from_str(&self.stats)
}

/// Returns the composite HashMap representation of stats contained in the action if present.
Expand Down Expand Up @@ -396,17 +394,17 @@ pub struct MetaData {
/// Unique identifier for this table
pub id: Guid,
/// User-provided identifier for this table
pub name: Option<String>,
pub name: String,
/// User-provided description for this table
pub description: Option<String>,
pub description: String,
/// Specification of the encoding for the files stored in the table
pub format: Format,
/// Schema of the table
pub schema_string: String,
/// An array containing the names of columns by which the data should be partitioned
pub partition_columns: Vec<String>,
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: DeltaDataTypeTimestamp,
pub created_time: Option<DeltaDataTypeTimestamp>,
/// A map containing configuration options for the table
pub configuration: HashMap<String, Option<String>>,
}
Expand All @@ -425,14 +423,18 @@ impl MetaData {
.map_err(|_| gen_action_type_error("metaData", "id", "string"))?
.clone();
}
"name" => match record.get_string(i) {
Ok(s) => re.name = Some(s.clone()),
_ => re.name = None,
},
"description" => match record.get_string(i) {
Ok(s) => re.description = Some(s.clone()),
_ => re.description = None,
},
"name" => {
re.name = record
.get_string(i)
.map_err(|_| gen_action_type_error("metaData", "name", "string"))?
.clone();
}
"description" => {
re.description = record
.get_string(i)
.map_err(|_| gen_action_type_error("metaData", "description", "string"))?
.clone();
}
"partitionColumns" => {
let columns_list = record.get_list(i).map_err(|_| {
gen_action_type_error("metaData", "partitionColumns", "list")
Expand All @@ -459,9 +461,10 @@ impl MetaData {
.clone();
}
"createdTime" => {
re.created_time = record
.get_long(i)
.map_err(|_| gen_action_type_error("metaData", "createdTime", "long"))?;
re.created_time =
Some(record.get_long(i).map_err(|_| {
gen_action_type_error("metaData", "createdTime", "long")
})?);
}
"configuration" => {
let configuration_map = record
Expand Down Expand Up @@ -537,18 +540,18 @@ pub struct Remove {
/// The path of the file that is removed from the table.
pub path: String,
/// The timestamp when the remove was added to table state.
pub deletion_timestamp: DeltaDataTypeTimestamp,
pub deletion_timestamp: Option<DeltaDataTypeTimestamp>,
/// Whether data is changed by the remove. A table optimize will report this as false for
/// example, since it adds and removes files by combining many files into one.
pub data_change: bool,
/// When true the fields partitionValues, size, and tags are present
pub extended_file_metadata: Option<bool>,
pub extended_file_metadata: bool,
/// A map from partition column to value for this file.
pub partition_values: Option<HashMap<String, Option<String>>>,
pub partition_values: HashMap<String, Option<String>>,
/// Size of this file in bytes
pub size: Option<DeltaDataTypeLong>,
pub size: DeltaDataTypeLong,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,
pub tags: HashMap<String, Option<String>>,
}

impl Remove {
Expand All @@ -571,14 +574,14 @@ impl Remove {
.map_err(|_| gen_action_type_error("remove", "dataChange", "bool"))?;
}
"extendedFileMetadata" => {
re.extended_file_metadata = Some(record.get_bool(i).map_err(|_| {
re.extended_file_metadata = record.get_bool(i).map_err(|_| {
gen_action_type_error("remove", "extendedFileMetadata", "bool")
})?);
})?;
}
"deletionTimestamp" => {
re.deletion_timestamp = record.get_long(i).map_err(|_| {
re.deletion_timestamp = Some(record.get_long(i).map_err(|_| {
gen_action_type_error("remove", "deletionTimestamp", "long")
})?;
})?);
}
"partitionValues" => match record.get_map(i) {
Ok(_) => {
Expand All @@ -596,9 +599,9 @@ impl Remove {
estr,
))
})?;
re.partition_values = Some(partitionValues);
re.partition_values = partitionValues;
}
_ => re.partition_values = None,
_ => re.partition_values = HashMap::new(),
},
"tags" => match record.get_map(i) {
Ok(tags_map) => {
Expand All @@ -610,18 +613,16 @@ impl Remove {
estr,
))
})?;
re.tags = Some(tags);
re.tags = tags;
}
_ => {
re.tags = None;
re.tags = HashMap::new();
}
},
"size" => {
re.size = Some(
record
.get_long(i)
.map_err(|_| gen_action_type_error("remove", "size", "long"))?,
);
re.size = record
.get_long(i)
.map_err(|_| gen_action_type_error("remove", "size", "long"))?
}
_ => {
log::warn!(
Expand Down Expand Up @@ -877,21 +878,19 @@ mod tests {
let add_action = Add::from_parquet_record(&add_record).unwrap();

assert_eq!(add_action.partition_values.len(), 0);
assert_eq!(add_action.stats, None);
assert_eq!(add_action.stats, String::new());
}

#[test]
fn test_load_table_stats() {
let action = Add {
stats: Some(
serde_json::json!({
"numRecords": 22,
"minValues": {"a": 1, "nested": {"b": 2, "c": "a"}},
"maxValues": {"a": 10, "nested": {"b": 20, "c": "z"}},
"nullCount": {"a": 1, "nested": {"b": 0, "c": 1}},
})
.to_string(),
),
stats: serde_json::json!({
"numRecords": 22,
"minValues": {"a": 1, "nested": {"b": 2, "c": "a"}},
"maxValues": {"a": 10, "nested": {"b": 20, "c": "z"}},
"nullCount": {"a": 1, "nested": {"b": 0, "c": 1}},
})
.to_string(),
..Default::default()
};

Expand Down
22 changes: 11 additions & 11 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,26 +192,26 @@ pub struct DeltaTableMetaData {
/// Unique identifier for this table
pub id: Guid,
/// User-provided identifier for this table
pub name: Option<String>,
pub name: String,
/// User-provided description for this table
pub description: Option<String>,
pub description: String,
/// Specification of the encoding for the files stored in the table
pub format: action::Format,
/// Schema of the table
pub schema: Schema,
/// An array containing the names of columns by which the data should be partitioned
pub partition_columns: Vec<String>,
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: DeltaDataTypeTimestamp,
pub created_time: Option<DeltaDataTypeTimestamp>,
/// table properties
pub configuration: HashMap<String, Option<String>>,
}

impl DeltaTableMetaData {
/// Create metadata for a DeltaTable from scratch
pub fn new(
name: Option<String>,
description: Option<String>,
name: String,
description: String,
format: Option<action::Format>,
schema: Schema,
partition_columns: Vec<String>,
Expand All @@ -226,7 +226,7 @@ impl DeltaTableMetaData {
format: format.unwrap_or_default(),
schema,
partition_columns,
created_time: Utc::now().timestamp_millis(),
created_time: Some(Utc::now().timestamp_millis()),
configuration,
}
}
Expand Down Expand Up @@ -919,7 +919,7 @@ impl DeltaTable {
Ok(self
.get_tombstones()
.iter()
.filter(|tombstone| tombstone.deletion_timestamp < delete_before_timestamp)
.filter(|tombstone| tombstone.deletion_timestamp.unwrap_or(0) < delete_before_timestamp)
.map(|tombstone| tombstone.path.as_str())
.collect::<HashSet<_>>())
}
Expand Down Expand Up @@ -1330,9 +1330,9 @@ impl<'a> DeltaTransaction<'a> {
size: bytes.len() as i64,
partition_values_parsed: None,
data_change: true,
stats: None,
stats: String::new(),
stats_parsed: None,
tags: None,
tags: HashMap::new(),
}));

Ok(())
Expand Down Expand Up @@ -1662,8 +1662,8 @@ mod tests {
);

let delta_md = DeltaTableMetaData::new(
Some("Test Table Create".to_string()),
Some("This table is made to test the create function for a DeltaTable".to_string()),
"Test Table Create".to_string(),
"This table is made to test the create function for a DeltaTable".to_string(),
None,
test_schema,
vec![],
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/azure_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod azure {
deltalake::action::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet"
.to_string(),
deletion_timestamp: 1587968596250,
deletion_timestamp: Some(1587968596250),
data_change: true,
..Default::default()
}
Expand Down
4 changes: 2 additions & 2 deletions rust/tests/concurrent_writes_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ impl Worker {
partition_values_parsed: None,
modification_time: 1564524294000,
data_change: true,
stats: None,
stats: String::new(),
stats_parsed: None,
tags: None,
tags: HashMap::new(),
}));
tx.commit(None).await.unwrap()
}
Expand Down
10 changes: 5 additions & 5 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn read_delta_2_0_table_without_version() {
tombstones[0],
deltalake::action::Remove {
path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
deletion_timestamp: 1564524298213,
deletion_timestamp: Some(1564524298213),
data_change: false,
..Default::default()
}
Expand Down Expand Up @@ -134,11 +134,11 @@ async fn read_delta_8_0_table_without_version() {
tombstones[0],
deltalake::action::Remove {
path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
deletion_timestamp: 1615043776198,
deletion_timestamp: Some(1615043776198),
data_change: true,
extended_file_metadata: Some(true),
partition_values: Some(HashMap::new()),
size: Some(445),
extended_file_metadata: true,
partition_values: HashMap::new(),
size: 445,
..Default::default()
}
);
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/read_simple_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn read_simple_table() {
tombstones[0],
deltalake::action::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: 1587968596250,
deletion_timestamp: Some(1587968596250),
data_change: true,
..Default::default()
}
Expand Down
Loading

0 comments on commit 15030a6

Please sign in to comment.