Skip to content

Commit

Permalink
Clean-up: inline methods into trait definition, for less indirection
Browse files Browse the repository at this point in the history
  • Loading branch information
huonw committed May 19, 2023
1 parent 65c8e16 commit 1800f55
Showing 1 changed file with 51 additions and 77 deletions.
128 changes: 51 additions & 77 deletions src/rust/engine/fs/store/src/remote/reapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,35 +111,6 @@ impl Provider {
})
}

async fn store_bytes_source(
&self,
digest: Digest,
bytes: ByteSource,
) -> Result<(), ByteStoreError> {
let len = digest.size_bytes;

let max_batch_total_size_bytes = {
let capabilities = self.get_capabilities().await?;

capabilities
.cache_capabilities
.as_ref()
.map(|c| c.max_batch_total_size_bytes as usize)
.unwrap_or_default()
};

let batch_api_allowed_by_local_config = len <= self.batch_api_size_limit;
let batch_api_allowed_by_server_config =
max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes;

let result = if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config {
self.store_bytes_source_batch(digest, bytes).await
} else {
self.store_bytes_source_stream(digest, bytes).await
};
result
}

async fn store_bytes_source_batch(
&self,
digest: Digest,
Expand Down Expand Up @@ -220,7 +191,56 @@ impl Provider {
future.await
}

async fn load_monomorphic(
async fn get_capabilities(&self) -> Result<&remexec::ServerCapabilities, ByteStoreError> {
let capabilities_fut = async {
let mut request = remexec::GetCapabilitiesRequest::default();
if let Some(s) = self.instance_name.as_ref() {
request.instance_name = s.clone();
}

let mut client = self.capabilities_client.as_ref().clone();
client
.get_capabilities(request)
.await
.map(|r| r.into_inner())
.map_err(ByteStoreError::Grpc)
};

self
.capabilities_cell
.get_or_try_init(capabilities_fut)
.await
}
}

#[async_trait]
impl ByteStoreProvider for Provider {
async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> {
let len = digest.size_bytes;

let max_batch_total_size_bytes = {
let capabilities = self.get_capabilities().await.map_err(|e| e.to_string())?;

capabilities
.cache_capabilities
.as_ref()
.map(|c| c.max_batch_total_size_bytes as usize)
.unwrap_or_default()
};

let batch_api_allowed_by_local_config = len <= self.batch_api_size_limit;
let batch_api_allowed_by_server_config =
max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes;

let result = if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config {
self.store_bytes_source_batch(digest, bytes).await
} else {
self.store_bytes_source_stream(digest, bytes).await
};
result.map_err(|e| e.to_string())
}

async fn load(
&self,
digest: Digest,
destination: &mut dyn LoadDestination,
Expand Down Expand Up @@ -299,7 +319,7 @@ impl Provider {
result
}

async fn list_missing_digests_(
async fn list_missing_digests(
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String> {
Expand Down Expand Up @@ -328,52 +348,6 @@ impl Provider {
.collect::<Result<HashSet<_>, _>>()
}

async fn get_capabilities(&self) -> Result<&remexec::ServerCapabilities, ByteStoreError> {
let capabilities_fut = async {
let mut request = remexec::GetCapabilitiesRequest::default();
if let Some(s) = self.instance_name.as_ref() {
request.instance_name = s.clone();
}

let mut client = self.capabilities_client.as_ref().clone();
client
.get_capabilities(request)
.await
.map(|r| r.into_inner())
.map_err(ByteStoreError::Grpc)
};

self
.capabilities_cell
.get_or_try_init(capabilities_fut)
.await
}
}

#[async_trait]
impl ByteStoreProvider for Provider {
async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> {
self
.store_bytes_source(digest, bytes)
.await
.map_err(|e| e.to_string())
}

async fn load(
&self,
digest: Digest,
destination: &mut dyn LoadDestination,
) -> Result<bool, String> {
self.load_monomorphic(digest, destination).await
}

async fn list_missing_digests(
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String> {
self.list_missing_digests_(digests).await
}

fn chunk_size_bytes(&self) -> usize {
self.chunk_size_bytes
}
Expand Down

0 comments on commit 1800f55

Please sign in to comment.