Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Jul 4, 2024
1 parent 070ea55 commit f49acf1
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions vortex-ipc/src/io/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::future::Future;
use std::io::Cursor;
use std::ops::Range;
use std::{io, mem};

use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use object_store::{ObjectStore, WriteMultipart};
use vortex_buffer::io_buf::IoBuf;
use vortex_buffer::Buffer;
use vortex_error::VortexResult;
Expand All @@ -20,7 +21,10 @@ pub trait ObjectStoreExt {

fn vortex_reader(&self, location: &Path) -> impl VortexReadAt;

fn vortex_writer(&self, location: &Path) -> impl VortexWrite;
fn vortex_writer(
&self,
location: &Path,
) -> impl Future<Output = VortexResult<impl VortexWrite>>;
}

impl<O: ObjectStore> ObjectStoreExt for O {
Expand All @@ -37,8 +41,11 @@ impl<O: ObjectStore> ObjectStoreExt for O {
ObjectStoreReadAt::new(self, location)
}

fn vortex_writer(&self, location: &Path) -> impl VortexWrite {
ObjectStoreWriter::new(self, location)
async fn vortex_writer(&self, location: &Path) -> VortexResult<impl VortexWrite> {
Ok(ObjectStoreWriter::new(WriteMultipart::new_with_chunk_size(
self.put_multipart(location).await?,
10 * 1024 * 1024,
)))
}
}

Expand Down Expand Up @@ -68,36 +75,41 @@ impl<'a, 'b, O: ObjectStore> VortexReadAt for ObjectStoreReadAt<'a, 'b, O> {
}
}

pub struct ObjectStoreWriter<'a, 'b, O: ObjectStore> {
object_store: &'a O,
location: &'b Path,
pub struct ObjectStoreWriter {
multipart: Option<WriteMultipart>,
}

impl<'a, 'b, O: ObjectStore> ObjectStoreWriter<'a, 'b, O> {
pub fn new(object_store: &'a O, location: &'b Path) -> Self {
impl ObjectStoreWriter {
pub fn new(multipart: WriteMultipart) -> Self {
Self {
object_store,
location,
multipart: Some(multipart),
}
}
}

impl<'a, 'b, O: ObjectStore> VortexWrite for ObjectStoreWriter<'a, 'b, O> {
impl VortexWrite for ObjectStoreWriter {
async fn write_all<B: IoBuf>(&mut self, buffer: B) -> std::io::Result<B> {
self.object_store
.put(
self.location,
PutPayload::from_bytes(Bytes::copy_from_slice(buffer.as_slice())),
)
.await?;
self.multipart
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "multipart already finished"))
.map(|mp| mp.write(buffer.as_slice()))?;
Ok(buffer)
}

async fn flush(&mut self) -> std::io::Result<()> {
Ok(())
Ok(self
.multipart
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "multipart already finished"))
.map(|mp| mp.wait_for_capacity(0))?
.await?)
}

async fn shutdown(&mut self) -> std::io::Result<()> {
let mp = mem::take(&mut self.multipart);
mp.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "multipart already finished"))?
.finish()
.await?;
Ok(())
}
}

0 comments on commit f49acf1

Please sign in to comment.