Skip to content

Commit

Permalink
MultiForkByKeyProvider for all datagen
Browse files Browse the repository at this point in the history
  • Loading branch information
robertbastian committed Feb 16, 2022
1 parent 723cb02 commit f3995b3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 29 deletions.
58 changes: 29 additions & 29 deletions provider/blob/src/export/blob_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,34 @@ use icu_provider::export::DataExporter;
use icu_provider::iter::IterableDynProvider;
use icu_provider::prelude::*;
use icu_provider::serde::SerializeMarker;
use litemap::LiteMap;
use writeable::Writeable;
use zerovec::map2d::ZeroMap2d;
use std::sync::Mutex;

/// A data exporter that writes data to a single-file blob.
/// See the module-level docs for an example.
pub struct BlobExporter {
resources: LiteMap<(ResourceKeyHash, String), Vec<u8>>,
sink: Box<dyn std::io::Write>,
resources: Mutex<Vec<(ResourceKeyHash, String, Vec<u8>)>>,
sink: Mutex<Option<Box<dyn std::io::Write>>>,
}

unsafe impl Sync for BlobExporter {}
unsafe impl Send for BlobExporter {}

impl BlobExporter {
/// Create a [`BlobExporter`] that writes to the given I/O stream.
pub fn new_with_sink(sink: Box<dyn std::io::Write>) -> Self {
Self {
resources: LiteMap::new(),
sink,
resources: Mutex::new(Vec::new()),
sink: Mutex::new(Some(sink)),
}
}
}

impl Drop for BlobExporter {
fn drop(&mut self) {
if !self.resources.is_empty() {
panic!("Please call close before dropping FilesystemExporter");
if self.sink.lock().unwrap().is_some() {
panic!("Please call close before dropping BlobExporter");
}
}
}
Expand All @@ -52,34 +55,31 @@ impl DataExporter<SerializeMarker> for BlobExporter {
.load_payload(key, req)?
.take_payload()?
.serialize(&mut <dyn erased_serde::Serializer>::erase(&mut serializer))?;
self.resources.insert(
(
key.get_hash(),
req.options.writeable_to_string().into_owned(),
),
self.resources.lock().unwrap().push((
key.get_hash(),
req.options.writeable_to_string().into_owned(),
serializer.output.0,
);
));
}
Ok(())
}

fn close(&mut self) -> Result<(), DataError> {
// Convert from LiteMap<(String, String), Vec<u8>> to ZeroMap2d<str, str, [u8]>
let mut zm: ZeroMap2d<ResourceKeyHash, str, [u8]> =
ZeroMap2d::with_capacity(self.resources.len());
for ((key, option), bytes) in self.resources.iter() {
zm.insert(key, option, bytes);
if let Some(mut sink) = self.sink.lock().unwrap().take() {
// We are the first `close` and own the sink now
let zm = self.resources.lock().unwrap().drain(..).collect::<ZeroMap2d<_, _, _>>();
let blob = BlobSchema::V001(BlobSchemaV1 {
resources: zm.as_borrowed(),
});
log::info!("Serializing blob to output stream...");
let mut serializer = postcard::Serializer {
output: postcard::flavors::AllocVec(Vec::new()),
};
serde::Serialize::serialize(&blob, &mut serializer)?;
sink.write_all(&serializer.output.0)?;
Ok(())
} else {
Err(DataError::custom("Close was called twice"))
}
let blob = BlobSchema::V001(BlobSchemaV1 {
resources: zm.as_borrowed(),
});
log::info!("Serializing blob to output stream...");
let mut serializer = postcard::Serializer {
output: postcard::flavors::AllocVec(Vec::new()),
};
serde::Serialize::serialize(&blob, &mut serializer)?;
self.sink.write_all(&serializer.output.0)?;
self.resources.clear();
Ok(())
}
}
3 changes: 3 additions & 0 deletions provider/fs/src/export/fs_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub struct FilesystemExporter {
serializer: Box<dyn AbstractSerializer>,
}

unsafe impl Sync for FilesystemExporter {}
unsafe impl Send for FilesystemExporter {}

impl DataExporter<SerializeMarker> for FilesystemExporter {
fn put_key_with_options(
&mut self,
Expand Down
Binary file modified provider/testdata/data/testdata.postcard
Binary file not shown.

0 comments on commit f3995b3

Please sign in to comment.