From 36c50685d7f6c6ea2cf1756c83dfc49681be771c Mon Sep 17 00:00:00 2001 From: ccamel Date: Fri, 28 Apr 2023 18:50:44 +0200 Subject: [PATCH] feat(objectarium): implement compression of objects --- contracts/okp4-law-stone/src/contract.rs | 8 +- contracts/okp4-objectarium/src/contract.rs | 198 ++++++++++++++------- contracts/okp4-objectarium/src/error.rs | 15 ++ contracts/okp4-objectarium/src/lib.rs | 1 + contracts/okp4-objectarium/src/state.rs | 48 +++++ 5 files changed, 204 insertions(+), 66 deletions(-) diff --git a/contracts/okp4-law-stone/src/contract.rs b/contracts/okp4-law-stone/src/contract.rs index 9a90941a..99b3ad2f 100644 --- a/contracts/okp4-law-stone/src/contract.rs +++ b/contracts/okp4-law-stone/src/contract.rs @@ -33,6 +33,7 @@ pub fn instantiate( let store_msg = StorageMsg::StoreObject { data: msg.program.clone(), pin: true, + compression_algorithm: None, }; let store_program_msg = WasmMsg::Execute { @@ -333,9 +334,14 @@ mod tests { WasmMsg::Execute { msg, .. } => { let result: StorageMsg = from_binary(msg).unwrap(); match result { - StorageMsg::StoreObject { data, pin } => { + StorageMsg::StoreObject { + data, + pin, + compression_algorithm, + } => { assert_eq!(data, program); assert!(pin, "the main program should be pinned"); + assert_eq!(compression_algorithm, None); } _ => panic!("storage message should be a StoreObject message"), } diff --git a/contracts/okp4-objectarium/src/contract.rs b/contracts/okp4-objectarium/src/contract.rs index 7c301b0b..af807664 100644 --- a/contracts/okp4-objectarium/src/contract.rs +++ b/contracts/okp4-objectarium/src/contract.rs @@ -43,7 +43,11 @@ pub fn execute( msg: ExecuteMsg, ) -> Result { match msg { - ExecuteMsg::StoreObject { data, pin } => execute::store_object(deps, info, data, pin), + ExecuteMsg::StoreObject { + data, + pin, + compression_algorithm, + } => execute::store_object(deps, info, data, pin, compression_algorithm), ExecuteMsg::PinObject { id } => execute::pin_object(deps, info, id), ExecuteMsg::UnpinObject { id } => execute::unpin_object(deps, info, id), ExecuteMsg::ForgetObject { id } => execute::forget_object(deps, info, id), @@ -52,6 +56,8 @@ pub fn execute( pub mod execute { use super::*; + use crate::compress::CompressionAlgorithm; + use crate::msg; use crate::state::BucketLimits; use crate::ContractError::ObjectAlreadyPinned; use cosmwasm_std::{Order, StdError, Uint128}; @@ -62,62 +68,88 @@ pub mod execute { info: MessageInfo, data: Binary, pin: bool, + compression_algorithm: Option, ) -> Result { let size = (data.len() as u128).into(); - let bucket_info = - BUCKET.update(deps.storage, |mut bucket| -> Result<_, ContractError> { - bucket.stat.size += size; - bucket.stat.object_count += Uint128::one(); - match bucket.limits { - BucketLimits { - max_object_size: Some(max), - .. - } if size > max => { - Err(BucketError::MaxObjectSizeLimitExceeded(size, max).into()) - } - BucketLimits { - max_objects: Some(max), - .. - } if bucket.stat.object_count > max => Err( - BucketError::MaxObjectsLimitExceeded(bucket.stat.object_count, max).into(), - ), - BucketLimits { - max_object_pins: Some(max), - .. - } if pin && max < Uint128::one() => { - Err(BucketError::MaxObjectPinsLimitExceeded(Uint128::one(), max).into()) - } - BucketLimits { - max_total_size: Some(max), - .. - } if bucket.stat.size > max => { - Err(BucketError::MaxTotalSizeLimitExceeded(bucket.stat.size, max).into()) - } - _ => Ok(bucket), - } - })?; + let bucket = BUCKET.load(deps.storage)?; + let compression: CompressionAlgorithm = compression_algorithm + .map(|a| a.into()) + .or_else(|| { + bucket + .limits + .accepted_compression_algorithms + .first() + .cloned() + }) + .unwrap_or(CompressionAlgorithm::Passthrough); + + // pre-conditions + if let Some(limit) = bucket.limits.max_object_size { + if size > limit { + return Err(BucketError::MaxObjectSizeLimitExceeded(size, limit).into()); + } + } + if let Some(limit) = bucket.limits.max_objects { + let value = bucket.stat.object_count + Uint128::one(); + if value > limit { + return Err(BucketError::MaxObjectsLimitExceeded(value, limit).into()); + } + } + if let Some(limit) = bucket.limits.max_object_pins { + if pin && limit.is_zero() { + return Err(BucketError::MaxObjectPinsLimitExceeded(Uint128::one(), limit).into()); + } + } + if let Some(limit) = bucket.limits.max_total_size { + let value = bucket.stat.size + size; + if value > limit { + return Err(BucketError::MaxTotalSizeLimitExceeded(value, limit).into()); + } + } + if !CompressionAlgorithm::values().contains(&compression) { + return Err(BucketError::CompressionAlgorithmNotAccepted( + compression, + bucket.limits.accepted_compression_algorithms, + ) + .into()); + } + + // store object data + let id = crypto::hash( + &crypto::HashAlgorithm::from(bucket.config.hash_algorithm_or_default()), + &data.0, + ); + let data_path = DATA.key(id.clone()); + if data_path.has(deps.storage) { + return Err(ContractError::Bucket(BucketError::ObjectAlreadyStored)); + } + let compressed_data = compression.compress(&data.0)?; + + data_path.save(deps.storage, &compressed_data.to_vec())?; + // store object + let compressed_size = (compressed_data.len() as u128).into(); let object = &Object { - id: crypto::hash( - &crypto::HashAlgorithm::from(bucket_info.config.hash_algorithm_or_default()), - &data.0, - ), + id: id.clone(), owner: info.sender.clone(), size, pin_count: if pin { Uint128::one() } else { Uint128::zero() }, + compression, + compressed_size, }; - let res = Response::new() - .add_attribute("action", "store_object") - .add_attribute("id", object.id.clone()); - let data_path = DATA.key(object.id.clone()); - if data_path.has(deps.storage) { - return Err(ContractError::Bucket(BucketError::ObjectAlreadyStored)); - } + objects().save(deps.storage, id, object)?; - data_path.save(deps.storage, &data.0)?; - objects().save(deps.storage, object.id.clone(), object)?; + // save bucket stats + BUCKET.update(deps.storage, |mut bucket| -> Result<_, ContractError> { + let stat = &mut bucket.stat; + stat.size += size; + stat.object_count += Uint128::one(); + stat.compressed_size += compressed_size; + Ok(bucket) + })?; + // save pin if pin { pins().save( deps.storage, @@ -129,7 +161,9 @@ pub mod execute { )?; } - Ok(res) + Ok(Response::new() + .add_attribute("action", "store_object") + .add_attribute("id", object.id.clone())) } pub fn pin_object( @@ -238,8 +272,8 @@ pub mod execute { } #[cfg_attr(not(feature = "library"), entry_point)] -pub fn query(deps: Deps, _env: Env, msg: QueryMsg) -> StdResult { - match msg { +pub fn query(deps: Deps, _env: Env, msg: QueryMsg) -> Result { + Ok(match msg { QueryMsg::Bucket {} => to_binary(&query::bucket(deps)?), QueryMsg::Object { id } => to_binary(&query::object(deps, id)?), QueryMsg::ObjectData { id } => to_binary(&query::data(deps, id)?), @@ -251,7 +285,7 @@ pub fn query(deps: Deps, _env: Env, msg: QueryMsg) -> StdResult { QueryMsg::ObjectPins { id, after, first } => { to_binary(&query::object_pins(deps, id, after, first)?) } - } + }?) } pub mod query { @@ -263,7 +297,7 @@ pub mod query { use crate::pagination::PaginationHandler; use cosmwasm_std::{Addr, Order}; - pub fn bucket(deps: Deps) -> StdResult { + pub fn bucket(deps: Deps) -> Result { let bucket = BUCKET.load(deps.storage)?; Ok(BucketResponse { @@ -274,14 +308,16 @@ pub mod query { }) } - pub fn object(deps: Deps, id: ObjectId) -> StdResult { - objects() - .load(deps.storage, id) - .map(|object| (&object).into()) + pub fn object(deps: Deps, id: ObjectId) -> Result { + let object = objects().load(deps.storage, id)?; + Ok((&object).into()) } - pub fn data(deps: Deps, id: ObjectId) -> StdResult { - DATA.load(deps.storage, id).map(Binary::from) + pub fn data(deps: Deps, id: ObjectId) -> Result { + let compression = objects().load(deps.storage, id.clone())?.compression; + let data = DATA.load(deps.storage, id)?; + let decompressed_data = compression.decompress(&data)?; + Ok(Binary::from(decompressed_data.into_vec())) } pub fn fetch_objects( @@ -378,9 +414,9 @@ mod tests { use super::*; use crate::error::BucketError; use crate::msg::{ - BucketConfig, BucketLimits, BucketLimitsBuilder, BucketResponse, HashAlgorithm, - ObjectPinsResponse, ObjectResponse, ObjectsResponse, PageInfo, PaginationConfig, - PaginationConfigBuilder, + BucketConfig, BucketLimits, BucketLimitsBuilder, BucketResponse, CompressionAlgorithm, + HashAlgorithm, ObjectPinsResponse, ObjectResponse, ObjectsResponse, PageInfo, + PaginationConfig, PaginationConfigBuilder, }; use base64::{engine::general_purpose, Engine as _}; use cosmwasm_std::testing::{mock_dependencies, mock_env, mock_info}; @@ -407,7 +443,16 @@ mod tests { let value: BucketResponse = from_binary(&res).unwrap(); assert_eq!("foo", value.name); assert_eq!(value.config, BucketConfig::default()); - assert_eq!(value.limits, BucketLimits::default()); + assert_eq!( + value.limits, + BucketLimits { + accepted_compression_algorithms: Some(vec![ + CompressionAlgorithm::Passthrough, + CompressionAlgorithm::Lz4, + ]), + ..BucketLimits::default() + } + ); assert_eq!(value.pagination.max_page_size, Some(30)); assert_eq!(value.pagination.default_page_size, Some(10)); @@ -721,6 +766,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(content).unwrap(), pin: *pin, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let res = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); assert_eq!( @@ -754,7 +800,7 @@ mod tests { assert_eq!( pins().has( &deps.storage, - (expected_hash.to_string(), info.clone().sender) + (expected_hash.to_string(), info.clone().sender), ), *pin, ); @@ -800,6 +846,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(object.as_str()).unwrap(), pin: true, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info.clone(), msg.clone()).unwrap(); assert_eq!( @@ -895,11 +942,13 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(obj1.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(obj2.as_str()).unwrap(), pin: true, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let res = execute(deps.as_mut(), mock_env(), info.clone(), msg); @@ -930,7 +979,7 @@ mod tests { .err() .unwrap() { - NotFound { .. } => (), + ContractError::Std(NotFound { .. }) => (), _ => panic!("assertion failed"), } @@ -938,6 +987,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: true, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -958,6 +1008,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -998,7 +1049,7 @@ mod tests { .err() .unwrap() { - NotFound { .. } => (), + ContractError::Std(NotFound { .. }) => (), _ => panic!("assertion failed"), } @@ -1006,6 +1057,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: data.clone(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info, msg).unwrap(); @@ -1255,6 +1307,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1262,6 +1315,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1269,6 +1323,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1495,6 +1550,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1502,6 +1558,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1509,6 +1566,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1594,18 +1652,21 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info1.clone(), msg).unwrap(); let data = general_purpose::STANDARD.encode("object2"); let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info1, msg).unwrap(); let data = general_purpose::STANDARD.encode("object3"); let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info2, msg).unwrap(); @@ -1639,6 +1700,8 @@ mod tests { owner: "creator2".to_string(), is_pinned: false, size: 7u128.into(), + compressed_size: 7u128.into(), + compression_algorithm: CompressionAlgorithm::Passthrough, } ); } @@ -1661,6 +1724,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info1.clone(), msg).unwrap(); // 1: 445008b7f2932922bdb184771d9978516a4f89d77000c2d6eab18b0894aac3a7 @@ -1668,6 +1732,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: true, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; execute(deps.as_mut(), mock_env(), info2, msg).unwrap(); // 2: abafa4428bdc8c34dae28bbc17303a62175f274edf59757b3e9898215a428a56 @@ -1763,7 +1828,7 @@ mod tests { .err() .unwrap() { - NotFound { .. } => (), + ContractError::Std(NotFound { .. }) => (), _ => panic!("assertion failed"), } } @@ -1875,6 +1940,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1882,6 +1948,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); @@ -1889,6 +1956,7 @@ mod tests { let msg = ExecuteMsg::StoreObject { data: Binary::from_base64(data.as_str()).unwrap(), pin: false, + compression_algorithm: Some(CompressionAlgorithm::Passthrough), }; let _ = execute(deps.as_mut(), mock_env(), info.clone(), msg).unwrap(); diff --git a/contracts/okp4-objectarium/src/error.rs b/contracts/okp4-objectarium/src/error.rs index 565a292f..48baead5 100644 --- a/contracts/okp4-objectarium/src/error.rs +++ b/contracts/okp4-objectarium/src/error.rs @@ -1,3 +1,4 @@ +use crate::compress::{CompressionAlgorithm, CompressionError}; use cosmwasm_std::{StdError, Uint128}; use thiserror::Error; @@ -11,6 +12,9 @@ pub enum ContractError { #[error("Object is already pinned")] ObjectAlreadyPinned {}, + + #[error("Compression error: {0}")] + CompressionError(String), } #[derive(Error, Debug, Eq, PartialEq)] @@ -32,4 +36,15 @@ pub enum BucketError { #[error("Object is already stored")] ObjectAlreadyStored, + + #[error("Compression algorithm is not accepted: {0:?} (accepted: \"{1:?}\")")] + CompressionAlgorithmNotAccepted(CompressionAlgorithm, Vec), +} + +impl From for ContractError { + fn from(err: CompressionError) -> Self { + match err { + CompressionError::Error(err) => ContractError::CompressionError(err), + } + } } diff --git a/contracts/okp4-objectarium/src/lib.rs b/contracts/okp4-objectarium/src/lib.rs index 1cc8298b..59c0e3c3 100644 --- a/contracts/okp4-objectarium/src/lib.rs +++ b/contracts/okp4-objectarium/src/lib.rs @@ -1,3 +1,4 @@ +pub mod compress; pub mod contract; pub mod crypto; mod cursor; diff --git a/contracts/okp4-objectarium/src/state.rs b/contracts/okp4-objectarium/src/state.rs index 05af1645..0bc3b2fe 100644 --- a/contracts/okp4-objectarium/src/state.rs +++ b/contracts/okp4-objectarium/src/state.rs @@ -1,9 +1,11 @@ +use crate::compress::CompressionAlgorithm; use crate::error::BucketError; use crate::error::BucketError::EmptyName; use crate::msg; use crate::msg::{ObjectResponse, PaginationConfig}; use cosmwasm_std::{Addr, StdError, StdResult, Uint128}; use cw_storage_plus::{Index, IndexList, IndexedMap, Item, Map, MultiIndex}; +use enum_iterator::all; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -29,6 +31,8 @@ pub struct Bucket { pub struct BucketStat { /// The total size of the objects contained in the bucket. pub size: Uint128, + /// The total size of the objects contained in the bucket after compression. + pub compressed_size: Uint128, /// The number of objects in the bucket. pub object_count: Uint128, } @@ -54,6 +58,7 @@ impl Bucket { pagination, stat: BucketStat { size: Uint128::zero(), + compressed_size: Uint128::zero(), object_count: Uint128::zero(), }, }) @@ -106,6 +111,30 @@ impl From for msg::HashAlgorithm { } } +impl From for CompressionAlgorithm { + fn from(algorithm: msg::CompressionAlgorithm) -> Self { + match algorithm { + msg::CompressionAlgorithm::Passthrough => CompressionAlgorithm::Passthrough, + msg::CompressionAlgorithm::Lz4 => CompressionAlgorithm::Lz4, + } + } +} + +impl From for msg::CompressionAlgorithm { + fn from(algorithm: CompressionAlgorithm) -> Self { + match algorithm { + CompressionAlgorithm::Passthrough => msg::CompressionAlgorithm::Passthrough, + CompressionAlgorithm::Lz4 => msg::CompressionAlgorithm::Lz4, + } + } +} + +impl CompressionAlgorithm { + pub fn values() -> Vec { + all::().collect::>() + } +} + /// BucketConfig is the type of the configuration of a bucket. /// /// The configuration is set at the instantiation of the bucket, and is immutable and cannot be changed. @@ -153,6 +182,8 @@ pub struct BucketLimits { pub max_object_size: Option, /// The maximum number of pins in the bucket for an object. pub max_object_pins: Option, + /// The accepted compression algorithms for the objects in the bucket. + pub accepted_compression_algorithms: Vec, } impl From for BucketLimits { @@ -162,6 +193,10 @@ impl From for BucketLimits { max_objects: limits.max_objects, max_object_size: limits.max_object_size, max_object_pins: limits.max_object_pins, + accepted_compression_algorithms: limits + .accepted_compression_algorithms + .map(|it| it.into_iter().map(|a| a.into()).collect::>()) + .unwrap_or_else(CompressionAlgorithm::values), } } } @@ -173,6 +208,13 @@ impl From for msg::BucketLimits { max_objects: limits.max_objects, max_object_size: limits.max_object_size, max_object_pins: limits.max_object_pins, + accepted_compression_algorithms: Some( + limits + .accepted_compression_algorithms + .into_iter() + .map(|a| a.into()) + .collect::>(), + ), } } } @@ -241,6 +283,10 @@ pub struct Object { pub size: Uint128, /// The number of pin on this object. pub pin_count: Uint128, + /// The compression algorithm used to compress the object. + pub compression: CompressionAlgorithm, + /// The size of the object after compression. + pub compressed_size: Uint128, } impl From<&Object> for ObjectResponse { @@ -250,6 +296,8 @@ impl From<&Object> for ObjectResponse { size: object.size, owner: object.owner.clone().into(), is_pinned: object.pin_count > Uint128::zero(), + compressed_size: object.compressed_size, + compression_algorithm: object.compression.into(), } } }