Skip to content

Commit

Permalink
release_gil
Browse files Browse the repository at this point in the history
  • Loading branch information
franz101 authored and ion-elgreco committed Mar 7, 2024
1 parent 1ddb616 commit 0b61692
Showing 1 changed file with 46 additions and 44 deletions.
90 changes: 46 additions & 44 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1401,60 +1401,62 @@ fn write_to_deltalake(
writer_properties: Option<HashMap<String, Option<String>>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let batches = data.0.map(|batch| batch.unwrap()).collect::<Vec<_>>();
let save_mode = mode.parse().map_err(PythonError::from)?;
py.allow_threads(|| {
let batches = data.0.map(|batch| batch.unwrap()).collect::<Vec<_>>();
let save_mode = mode.parse().map_err(PythonError::from)?;

let options = storage_options.clone().unwrap_or_default();
let table = rt()?
.block_on(DeltaOps::try_from_uri_with_storage_options(
&table_uri, options,
))
.map_err(PythonError::from)?;
let options = storage_options.clone().unwrap_or_default();
let table = rt()?
.block_on(DeltaOps::try_from_uri_with_storage_options(
&table_uri, options,
))
.map_err(PythonError::from)?;

let mut builder = table
.write(batches)
.with_save_mode(save_mode)
.with_write_batch_size(max_rows_per_group as usize);
if let Some(schema_mode) = schema_mode {
builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?);
}
if let Some(partition_columns) = partition_by {
builder = builder.with_partition_columns(partition_columns);
}
let mut builder = table
.write(batches)
.with_save_mode(save_mode)
.with_write_batch_size(max_rows_per_group as usize);
if let Some(schema_mode) = schema_mode {
builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?);
}
if let Some(partition_columns) = partition_by {
builder = builder.with_partition_columns(partition_columns);
}

if let Some(writer_props) = writer_properties {
builder = builder.with_writer_properties(
set_writer_properties(writer_props).map_err(PythonError::from)?,
);
}
if let Some(writer_props) = writer_properties {
builder = builder.with_writer_properties(
set_writer_properties(writer_props).map_err(PythonError::from)?,
);
}

if let Some(name) = &name {
builder = builder.with_table_name(name);
};
if let Some(name) = &name {
builder = builder.with_table_name(name);
};

if let Some(description) = &description {
builder = builder.with_description(description);
};
if let Some(description) = &description {
builder = builder.with_description(description);
};

if let Some(predicate) = predicate {
builder = builder.with_replace_where(predicate);
};
if let Some(predicate) = predicate {
builder = builder.with_replace_where(predicate);
};

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};
if let Some(config) = configuration {
builder = builder.with_configuration(config);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};
if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
Ok(())
})
}

#[pyfunction]
Expand Down

0 comments on commit 0b61692

Please sign in to comment.