From e2ff7e8645b34d1f14b6bbf91ac7bafdf93fc08f Mon Sep 17 00:00:00 2001
From: nanocryk <6422796+nanocryk@users.noreply.github.com>
Date: Wed, 3 Jan 2024 11:52:53 +0100
Subject: [PATCH 01/40] pallet layout
---
Cargo.lock | 23 ++++
pallets/stream-payment/Cargo.toml | 34 ++++++
pallets/stream-payment/src/lib.rs | 178 ++++++++++++++++++++++++++++++
3 files changed, 235 insertions(+)
create mode 100644 pallets/stream-payment/Cargo.toml
create mode 100644 pallets/stream-payment/src/lib.rs
diff --git a/Cargo.lock b/Cargo.lock
index 953645777..ba3532b19 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8939,6 +8939,29 @@ dependencies = [
"sp-std",
]
+[[package]]
+name = "pallet-stream-payment"
+version = "0.1.0"
+dependencies = [
+ "dp-core",
+ "frame-benchmarking",
+ "frame-support",
+ "frame-system",
+ "log",
+ "num-traits",
+ "pallet-balances",
+ "parity-scale-codec",
+ "scale-info",
+ "serde",
+ "similar-asserts",
+ "sp-core",
+ "sp-io",
+ "sp-runtime",
+ "sp-std",
+ "tp-maths",
+ "tp-traits",
+]
+
[[package]]
name = "pallet-sudo"
version = "4.0.0-dev"
diff --git a/pallets/stream-payment/Cargo.toml b/pallets/stream-payment/Cargo.toml
new file mode 100644
index 000000000..b4bf77aa5
--- /dev/null
+++ b/pallets/stream-payment/Cargo.toml
@@ -0,0 +1,34 @@
+[package]
+name = "pallet-stream-payment"
+authors = { workspace = true }
+description = "Stream payment pallet"
+edition = "2021"
+license = "GPL-3.0-only"
+version = "0.1.0"
+
+[package.metadata.docs.rs]
+targets = [ "x86_64-unknown-linux-gnu" ]
+
+[dependencies]
+log = { workspace = true }
+serde = { workspace = true, optional = true }
+
+dp-core = { workspace = true }
+tp-maths = { workspace = true }
+tp-traits = { workspace = true }
+
+# Substrate
+frame-benchmarking = { workspace = true, optional = true }
+frame-support = { workspace = true }
+frame-system = { workspace = true }
+parity-scale-codec = { workspace = true }
+scale-info = { workspace = true }
+sp-core = { workspace = true }
+sp-runtime = { workspace = true }
+sp-std = { workspace = true }
+
+[dev-dependencies]
+num-traits = { workspace = true }
+pallet-balances = { workspace = true, features = [ "std" ] }
+similar-asserts = { workspace = true }
+sp-io = { workspace = true, features = [ "std" ] }
diff --git a/pallets/stream-payment/src/lib.rs b/pallets/stream-payment/src/lib.rs
new file mode 100644
index 000000000..2d16ae066
--- /dev/null
+++ b/pallets/stream-payment/src/lib.rs
@@ -0,0 +1,178 @@
+// Copyright (C) Moondance Labs Ltd.
+// This file is part of Tanssi.
+
+// Tanssi is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Tanssi is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Tanssi. If not, see
+
+#![cfg_attr(not(feature = "std"), no_std)]
+
+use {
+ frame_support::{
+ pallet,
+ pallet_prelude::*,
+ storage::types::{StorageDoubleMap, StorageMap},
+ traits::{fungibles, tokens::Balance},
+ Blake2_128Concat,
+ },
+ frame_system::pallet_prelude::*,
+ parity_scale_codec::{FullCodec, MaxEncodedLen},
+ scale_info::TypeInfo,
+ sp_std::{fmt::Debug, marker::PhantomData},
+};
+
+/// Type able to provide the current time for given unit.
+pub trait TimeProvider {
+ fn now(unit: Unit) -> Option;
+}
+
+#[pallet(dev_mode)]
+pub mod pallet {
+ use super::*;
+
+ /// Pooled Staking pallet.
+ #[pallet::pallet]
+ #[pallet::without_storage_info]
+ pub struct Pallet(PhantomData);
+
+ #[pallet::config]
+ pub trait Config: frame_system::Config {
+ /// Represents which units of time can be used. Designed to be an enum
+ /// with a variant for each kind of time source/scale supported.
+ type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
+
+ /// The balance type, which is also the type representing time (as this
+ /// pallet will do math with both time and balances to compute how
+ /// much should be paid).
+ type Balance: Balance;
+
+ /// The currencies type, supporting multiple currencies.
+ type Currencies: fungibles::Inspect;
+
+ /// Provide the current time in given unit.
+ type TimeProvider: TimeProvider;
+ }
+
+ pub type StreamId = u64;
+ type AccountIdOf = ::AccountId;
+ type AssetIdOf = <::Currencies as fungibles::Inspect>>::AssetId;
+
+ /// A stream payment from source to target.
+ /// Stores the last time the stream was updated, which allows to compute
+ /// elapsed time and perform payment.
+ #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
+ #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Clone, TypeInfo)]
+ pub struct Stream {
+ source: AccountId,
+ target: AccountId,
+ time_unit: Unit,
+ asset_id: AssetId,
+ rate_per_time_unit: Balance,
+ locked_funds: Balance,
+ last_time_updated: Balance,
+ }
+
+ pub type StreamOf =
+ Stream, ::TimeUnit, AssetIdOf, ::Balance>;
+
+ /// Store the next available stream id.
+ #[pallet::storage]
+ pub type NextStreamId = StorageValue;
+
+ /// Store each stream indexed by an Id.
+ #[pallet::storage]
+ pub type Streams = StorageMap<
+ Hasher = Blake2_128Concat,
+ Key = StreamId,
+ Value = StreamOf,
+ QueryKind = OptionQuery,
+ >;
+
+ /// Lookup for all streams with given source.
+ /// To avoid maintaining a growing list of stream ids, they are stored in
+ /// the form of an entry (AccountId, StreamId). If such entry exists then
+ /// this AccountId is a source in StreamId. One can iterate over all storage
+ /// keys starting with the AccountId to find all StreamIds.
+ #[pallet::storage]
+ pub type LookupStreamsWithSource = StorageDoubleMap<
+ Key1 = AccountIdOf,
+ Hasher1 = Blake2_128Concat,
+ Key2 = StreamId,
+ Hasher2 = Blake2_128Concat,
+ Value = (),
+ QueryKind = OptionQuery,
+ >;
+
+ /// Lookup for all streams with given target.
+ /// To avoid maintaining a growing list of stream ids, they are stored in
+ /// the form of an entry (AccountId, StreamId). If such entry exists then
+ /// this AccountId is a target in StreamId. One can iterate over all storage
+ /// keys starting with the AccountId to find all StreamIds.
+ #[pallet::storage]
+ pub type LookupStreamsWithTarget = StorageDoubleMap<
+ Key1 = AccountIdOf,
+ Hasher1 = Blake2_128Concat,
+ Key2 = StreamId,
+ Hasher2 = Blake2_128Concat,
+ Value = (),
+ QueryKind = OptionQuery,
+ >;
+
+ #[pallet::call]
+ impl Pallet {
+ #[pallet::call_index(0)]
+ pub fn open_stream(
+ _origin: OriginFor,
+ _target: AccountIdOf,
+ _time_unit: T::TimeUnit,
+ _asset_id: AssetIdOf,
+ _rate_per_time_unit: T::Balance,
+ _initial_deposit: T::Balance,
+ ) -> DispatchResultWithPostInfo {
+ todo!()
+ }
+
+ #[pallet::call_index(1)]
+ pub fn close_stream(
+ _origin: OriginFor,
+ _stream: StreamId,
+ ) -> DispatchResultWithPostInfo {
+ todo!()
+ }
+
+ #[pallet::call_index(2)]
+ pub fn update_stream(
+ _origin: OriginFor,
+ _stream: StreamId,
+ ) -> DispatchResultWithPostInfo {
+ todo!()
+ }
+
+ #[pallet::call_index(3)]
+ pub fn refill_stream(
+ _origin: OriginFor,
+ _stream: StreamId,
+ _new_deposit: T::Balance,
+ ) -> DispatchResultWithPostInfo {
+ todo!()
+ }
+
+ #[pallet::call_index(4)]
+ pub fn change_stream_rate(
+ _origin: OriginFor,
+ _stream: StreamId,
+ _new_rate_per_time_unit: T::Balance,
+ ) -> DispatchResultWithPostInfo {
+ todo!()
+ }
+ }
+}
From 6c4faadfa0762f438f9a0100b0adc0cd9c06e23e Mon Sep 17 00:00:00 2001
From: nanocryk <6422796+nanocryk@users.noreply.github.com>
Date: Thu, 4 Jan 2024 11:04:49 +0100
Subject: [PATCH 02/40] open stream + update logic
---
pallets/stream-payment/src/lib.rs | 178 ++++++++++++++++++++++++++----
1 file changed, 157 insertions(+), 21 deletions(-)
diff --git a/pallets/stream-payment/src/lib.rs b/pallets/stream-payment/src/lib.rs
index 2d16ae066..312b7a713 100644
--- a/pallets/stream-payment/src/lib.rs
+++ b/pallets/stream-payment/src/lib.rs
@@ -21,18 +21,24 @@ use {
pallet,
pallet_prelude::*,
storage::types::{StorageDoubleMap, StorageMap},
- traits::{fungibles, tokens::Balance},
+ traits::{
+ fungibles::{self, Mutate as _, MutateFreeze as _},
+ tokens::{Balance, Preservation},
+ },
Blake2_128Concat,
},
frame_system::pallet_prelude::*,
parity_scale_codec::{FullCodec, MaxEncodedLen},
scale_info::TypeInfo,
+ sp_runtime::traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedMul, CheckedSub, One, Zero},
sp_std::{fmt::Debug, marker::PhantomData},
};
/// Type able to provide the current time for given unit.
+/// For each unit the returned number should monotonically increase and not
+/// overflow.
pub trait TimeProvider {
- fn now(unit: Unit) -> Option;
+ fn now(unit: &Unit) -> Option;
}
#[pallet(dev_mode)]
@@ -46,6 +52,16 @@ pub mod pallet {
#[pallet::config]
pub trait Config: frame_system::Config {
+ /// Type used to represent stream ids. Should be large enough to not overflow.
+ type StreamId: AtLeast32BitUnsigned
+ + Default
+ + Debug
+ + Copy
+ + Clone
+ + FullCodec
+ + TypeInfo
+ + MaxEncodedLen;
+
/// Represents which units of time can be used. Designed to be an enum
/// with a variant for each kind of time source/scale supported.
type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
@@ -55,14 +71,19 @@ pub mod pallet {
/// much should be paid).
type Balance: Balance;
+ /// LockId type used by `Currencies`.
+ type LockId: From;
+
/// The currencies type, supporting multiple currencies.
- type Currencies: fungibles::Inspect;
+ type Currencies: fungibles::Inspect
+ + fungibles::InspectFreeze
+ + fungibles::Mutate
+ + fungibles::MutateFreeze;
/// Provide the current time in given unit.
type TimeProvider: TimeProvider;
}
- pub type StreamId = u64;
type AccountIdOf = ::AccountId;
type AssetIdOf = <::Currencies as fungibles::Inspect>>::AssetId;
@@ -86,13 +107,13 @@ pub mod pallet {
/// Store the next available stream id.
#[pallet::storage]
- pub type NextStreamId = StorageValue;
+ pub type NextStreamId = StorageValue;
/// Store each stream indexed by an Id.
#[pallet::storage]
pub type Streams = StorageMap<
Hasher = Blake2_128Concat,
- Key = StreamId,
+ Key = T::StreamId,
Value = StreamOf,
QueryKind = OptionQuery,
>;
@@ -106,7 +127,7 @@ pub mod pallet {
pub type LookupStreamsWithSource = StorageDoubleMap<
Key1 = AccountIdOf,
Hasher1 = Blake2_128Concat,
- Key2 = StreamId,
+ Key2 = T::StreamId,
Hasher2 = Blake2_128Concat,
Value = (),
QueryKind = OptionQuery,
@@ -121,46 +142,101 @@ pub mod pallet {
pub type LookupStreamsWithTarget = StorageDoubleMap<
Key1 = AccountIdOf,
Hasher1 = Blake2_128Concat,
- Key2 = StreamId,
+ Key2 = T::StreamId,
Hasher2 = Blake2_128Concat,
Value = (),
QueryKind = OptionQuery,
>;
+ #[pallet::error]
+ pub enum Error {
+ UnknownStreamId,
+ StreamIdOverflow,
+ CantBeBothSourceAndTarget,
+ CantFetchCurrentTime,
+ TimeOverflow,
+ CurrencyOverflow,
+ }
+
+ #[pallet::composite_enum]
+ pub enum LockId {
+ StreamPayment,
+ }
+
#[pallet::call]
impl Pallet {
#[pallet::call_index(0)]
pub fn open_stream(
- _origin: OriginFor,
- _target: AccountIdOf,
- _time_unit: T::TimeUnit,
- _asset_id: AssetIdOf,
- _rate_per_time_unit: T::Balance,
- _initial_deposit: T::Balance,
+ origin: OriginFor,
+ target: AccountIdOf,
+ time_unit: T::TimeUnit,
+ asset_id: AssetIdOf,
+ rate_per_time_unit: T::Balance,
+ initial_deposit: T::Balance,
) -> DispatchResultWithPostInfo {
- todo!()
+ let origin = ensure_signed(origin)?;
+ ensure!(origin != target, Error::::CantBeBothSourceAndTarget);
+
+ let stream_id = NextStreamId::::get();
+ let next_stream_id = stream_id
+ .checked_add(&One::one())
+ .ok_or(Error::::StreamIdOverflow)?;
+ NextStreamId::::set(next_stream_id);
+
+ T::Currencies::increase_frozen(
+ asset_id.clone(),
+ &LockId::StreamPayment.into(),
+ &origin,
+ initial_deposit,
+ )?;
+
+ let now = T::TimeProvider::now(&time_unit).ok_or(Error::::CantFetchCurrentTime)?;
+ let stream = Stream {
+ source: origin.clone(),
+ target: target.clone(),
+ time_unit,
+ asset_id,
+ rate_per_time_unit,
+ locked_funds: initial_deposit,
+ last_time_updated: now,
+ };
+
+ Streams::::insert(stream_id, stream);
+ LookupStreamsWithSource::::insert(origin, stream_id, ());
+ LookupStreamsWithTarget::::insert(target, stream_id, ());
+
+ Ok(().into())
}
#[pallet::call_index(1)]
pub fn close_stream(
_origin: OriginFor,
- _stream: StreamId,
+ _stream_id: T::StreamId,
) -> DispatchResultWithPostInfo {
todo!()
}
#[pallet::call_index(2)]
pub fn update_stream(
- _origin: OriginFor,
- _stream: StreamId,
+ origin: OriginFor,
+ stream_id: T::StreamId,
) -> DispatchResultWithPostInfo {
- todo!()
+ // No problem with anyone updating any stream.
+ let _ = ensure_signed(origin)?;
+
+ let mut stream = Streams::::get(stream_id).ok_or(Error::::UnknownStreamId)?;
+ Self::perform_stream_payment(&mut stream)?;
+ Streams::::insert(stream_id, stream);
+
+ // TODO: Event here or in do_update_stream?
+
+ Ok(().into())
}
#[pallet::call_index(3)]
pub fn refill_stream(
_origin: OriginFor,
- _stream: StreamId,
+ _stream_id: T::StreamId,
_new_deposit: T::Balance,
) -> DispatchResultWithPostInfo {
todo!()
@@ -169,10 +245,70 @@ pub mod pallet {
#[pallet::call_index(4)]
pub fn change_stream_rate(
_origin: OriginFor,
- _stream: StreamId,
+ _stream_id: T::StreamId,
_new_rate_per_time_unit: T::Balance,
) -> DispatchResultWithPostInfo {
todo!()
}
}
+
+ impl Pallet {
+ /// Behavior:
+ /// A stream payment consist of a locked deposit, a rate per unit of time and the
+ /// last time the stream was updated. When updating the stream, **at most**
+ /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
+ /// account. If this amount is greater than the left deposit, the stream is considered
+ /// drained **but not closed**. The source can come back later and refill the stream,
+ /// however there will be no retroactive payment for the time spent as drained.
+ /// If the stream payment is used to rent a service, the target should pause the service
+ /// while the stream is drained, and resume it once it is refilled.
+ fn perform_stream_payment(stream: &mut StreamOf) -> DispatchResultWithPostInfo {
+ let now =
+ T::TimeProvider::now(&stream.time_unit).ok_or(Error::::CantFetchCurrentTime)?;
+
+ if stream.locked_funds.is_zero() {
+ stream.last_time_updated = now;
+ return Ok(().into());
+ }
+
+ let delta = now
+ .checked_sub(&stream.last_time_updated)
+ .ok_or(Error::::TimeOverflow)?;
+ let mut payment = delta
+ .checked_mul(&stream.rate_per_time_unit)
+ .ok_or(Error::::CurrencyOverflow)?;
+
+ // We compute the new amount of locked funds. If it underflows it
+ // means that there is more to pay that what is left, in which case
+ // we pay all that is left.
+ let new_locked = match stream.locked_funds.checked_sub(&payment) {
+ Some(v) => v,
+ None => {
+ payment = stream.locked_funds;
+ Zero::zero()
+ }
+ };
+
+ T::Currencies::decrease_frozen(
+ stream.asset_id.clone(),
+ &LockId::StreamPayment.into(),
+ &stream.source,
+ payment,
+ )?;
+ T::Currencies::transfer(
+ stream.asset_id.clone(),
+ &stream.source,
+ &stream.target,
+ payment,
+ Preservation::Preserve,
+ )?;
+
+ stream.last_time_updated = now;
+ stream.locked_funds = new_locked;
+
+ // TODO: Emit event here?
+
+ Ok(().into())
+ }
+ }
}
From 56dd91403da9f4334fcb68a68dde5250b87718b1 Mon Sep 17 00:00:00 2001
From: nanocryk <6422796+nanocryk@users.noreply.github.com>
Date: Thu, 4 Jan 2024 12:01:40 +0100
Subject: [PATCH 03/40] close stream + events
---
pallets/stream-payment/src/lib.rs | 109 +++++++++++++++++++++++++-----
1 file changed, 92 insertions(+), 17 deletions(-)
diff --git a/pallets/stream-payment/src/lib.rs b/pallets/stream-payment/src/lib.rs
index 312b7a713..1f6cf2da3 100644
--- a/pallets/stream-payment/src/lib.rs
+++ b/pallets/stream-payment/src/lib.rs
@@ -52,6 +52,9 @@ pub mod pallet {
#[pallet::config]
pub trait Config: frame_system::Config {
+ /// Overarching event type
+ type RuntimeEvent: From> + IsType<::RuntimeEvent>;
+
/// Type used to represent stream ids. Should be large enough to not overflow.
type StreamId: AtLeast32BitUnsigned
+ Default
@@ -98,7 +101,7 @@ pub mod pallet {
time_unit: Unit,
asset_id: AssetId,
rate_per_time_unit: Balance,
- locked_funds: Balance,
+ deposit: Balance,
last_time_updated: Balance,
}
@@ -152,12 +155,32 @@ pub mod pallet {
pub enum Error {
UnknownStreamId,
StreamIdOverflow,
+ UnauthorizedOrigin,
CantBeBothSourceAndTarget,
CantFetchCurrentTime,
TimeOverflow,
CurrencyOverflow,
}
+ #[pallet::event]
+ #[pallet::generate_deposit(pub(super) fn deposit_event)]
+ pub enum Event {
+ StreamOpened {
+ stream_id: T::StreamId,
+ },
+ StreamClosed {
+ stream_id: T::StreamId,
+ refunded: T::Balance,
+ },
+ StreamPayment {
+ stream_id: T::StreamId,
+ source: AccountIdOf,
+ target: AccountIdOf,
+ amount: T::Balance,
+ drained: bool,
+ },
+ }
+
#[pallet::composite_enum]
pub enum LockId {
StreamPayment,
@@ -177,12 +200,14 @@ pub mod pallet {
let origin = ensure_signed(origin)?;
ensure!(origin != target, Error::::CantBeBothSourceAndTarget);
+ // Generate a new stream id.
let stream_id = NextStreamId::::get();
let next_stream_id = stream_id
.checked_add(&One::one())
.ok_or(Error::::StreamIdOverflow)?;
NextStreamId::::set(next_stream_id);
+ // Freeze initial deposit.
T::Currencies::increase_frozen(
asset_id.clone(),
&LockId::StreamPayment.into(),
@@ -190,6 +215,7 @@ pub mod pallet {
initial_deposit,
)?;
+ // Create stream data.
let now = T::TimeProvider::now(&time_unit).ok_or(Error::::CantFetchCurrentTime)?;
let stream = Stream {
source: origin.clone(),
@@ -197,23 +223,58 @@ pub mod pallet {
time_unit,
asset_id,
rate_per_time_unit,
- locked_funds: initial_deposit,
+ deposit: initial_deposit,
last_time_updated: now,
};
+ // Insert stream in storage.
Streams::::insert(stream_id, stream);
LookupStreamsWithSource::::insert(origin, stream_id, ());
LookupStreamsWithTarget::::insert(target, stream_id, ());
+ // Emit event.
+ Pallet::::deposit_event(Event::::StreamOpened { stream_id });
+
Ok(().into())
}
#[pallet::call_index(1)]
pub fn close_stream(
- _origin: OriginFor,
- _stream_id: T::StreamId,
+ origin: OriginFor,
+ stream_id: T::StreamId,
) -> DispatchResultWithPostInfo {
- todo!()
+ let origin = ensure_signed(origin)?;
+ let mut stream = Streams::::get(stream_id).ok_or(Error::::UnknownStreamId)?;
+
+ // Only source or target can close a stream.
+ ensure!(
+ origin == stream.source || origin == stream.target,
+ Error::::UnauthorizedOrigin
+ );
+
+ // Update stream before closing it to ensure fair payment.
+ Self::perform_stream_payment(stream_id, &mut stream)?;
+
+ // Unfreeze funds left in the stream.
+ T::Currencies::decrease_frozen(
+ stream.asset_id.clone(),
+ &LockId::StreamPayment.into(),
+ &stream.source,
+ stream.deposit,
+ )?;
+
+ // Remove stream from storage.
+ Streams::::remove(stream_id);
+ LookupStreamsWithSource::::remove(stream.source, stream_id);
+ LookupStreamsWithTarget::::remove(stream.target, stream_id);
+
+ // Emit event.
+ Pallet::::deposit_event(Event::::StreamClosed {
+ stream_id,
+ refunded: stream.deposit,
+ });
+
+ Ok(().into())
}
#[pallet::call_index(2)]
@@ -225,11 +286,9 @@ pub mod pallet {
let _ = ensure_signed(origin)?;
let mut stream = Streams::::get(stream_id).ok_or(Error::::UnknownStreamId)?;
- Self::perform_stream_payment(&mut stream)?;
+ Self::perform_stream_payment(stream_id, &mut stream)?;
Streams::::insert(stream_id, stream);
- // TODO: Event here or in do_update_stream?
-
Ok(().into())
}
@@ -262,11 +321,15 @@ pub mod pallet {
/// however there will be no retroactive payment for the time spent as drained.
/// If the stream payment is used to rent a service, the target should pause the service
/// while the stream is drained, and resume it once it is refilled.
- fn perform_stream_payment(stream: &mut StreamOf) -> DispatchResultWithPostInfo {
+ fn perform_stream_payment(
+ stream_id: T::StreamId,
+ stream: &mut StreamOf,
+ ) -> DispatchResultWithPostInfo {
let now =
T::TimeProvider::now(&stream.time_unit).ok_or(Error::::CantFetchCurrentTime)?;
- if stream.locked_funds.is_zero() {
+ // If deposit is zero the stream is fully drained and there is nothing to transfer.
+ if stream.deposit.is_zero() {
stream.last_time_updated = now;
return Ok(().into());
}
@@ -274,6 +337,9 @@ pub mod pallet {
let delta = now
.checked_sub(&stream.last_time_updated)
.ok_or(Error::::TimeOverflow)?;
+
+ // We compute the amount due to the target according to the rate, which may be
+ // lowered if the stream deposit is lower.
let mut payment = delta
.checked_mul(&stream.rate_per_time_unit)
.ok_or(Error::::CurrencyOverflow)?;
@@ -281,14 +347,15 @@ pub mod pallet {
// We compute the new amount of locked funds. If it underflows it
// means that there is more to pay that what is left, in which case
// we pay all that is left.
- let new_locked = match stream.locked_funds.checked_sub(&payment) {
- Some(v) => v,
+ let (new_locked, drained) = match stream.deposit.checked_sub(&payment) {
+ Some(v) => (v, false),
None => {
- payment = stream.locked_funds;
- Zero::zero()
+ payment = stream.deposit;
+ (Zero::zero(), true)
}
};
+ // Transfer from the source to target.
T::Currencies::decrease_frozen(
stream.asset_id.clone(),
&LockId::StreamPayment.into(),
@@ -303,10 +370,18 @@ pub mod pallet {
Preservation::Preserve,
)?;
+ // Update stream info.
stream.last_time_updated = now;
- stream.locked_funds = new_locked;
-
- // TODO: Emit event here?
+ stream.deposit = new_locked;
+
+ // Emit event.
+ Pallet::::deposit_event(Event::::StreamPayment {
+ stream_id,
+ source: stream.source.clone(),
+ target: stream.target.clone(),
+ amount: payment,
+ drained,
+ });
Ok(().into())
}
From 0f75ed05f27852cc4dcd3d09a9edd0f3cef6da61 Mon Sep 17 00:00:00 2001
From: nanocryk <6422796+nanocryk@users.noreply.github.com>
Date: Fri, 5 Jan 2024 11:05:17 +0100
Subject: [PATCH 04/40] refill + change rate
---
pallets/stream-payment/src/lib.rs | 85 ++++++++++++++++++++++++++++---
1 file changed, 77 insertions(+), 8 deletions(-)
diff --git a/pallets/stream-payment/src/lib.rs b/pallets/stream-payment/src/lib.rs
index 1f6cf2da3..4171a9f7a 100644
--- a/pallets/stream-payment/src/lib.rs
+++ b/pallets/stream-payment/src/lib.rs
@@ -160,6 +160,8 @@ pub mod pallet {
CantFetchCurrentTime,
TimeOverflow,
CurrencyOverflow,
+ SourceCantDecreaseRate,
+ TargetCantIncreaseRate,
}
#[pallet::event]
@@ -179,6 +181,11 @@ pub mod pallet {
amount: T::Balance,
drained: bool,
},
+ StreamRateChanged {
+ stream_id: T::StreamId,
+ old_rate: T::Balance,
+ new_rate: T::Balance,
+ },
}
#[pallet::composite_enum]
@@ -294,20 +301,82 @@ pub mod pallet {
#[pallet::call_index(3)]
pub fn refill_stream(
- _origin: OriginFor,
- _stream_id: T::StreamId,
- _new_deposit: T::Balance,
+ origin: OriginFor,
+ stream_id: T::StreamId,
+ new_deposit: T::Balance,
) -> DispatchResultWithPostInfo {
- todo!()
+ let origin = ensure_signed(origin)?;
+ let mut stream = Streams::::get(stream_id).ok_or(Error::::UnknownStreamId)?;
+
+ // Only source can refill stream
+ ensure!(origin == stream.source, Error::::UnauthorizedOrigin);
+
+ // Source will not pay for drained stream retroactively, so we perform payment with
+ // what is left first.
+ Self::perform_stream_payment(stream_id, &mut stream)?;
+
+ // Increase deposit.
+ T::Currencies::increase_frozen(
+ stream.asset_id.clone(),
+ &LockId::StreamPayment.into(),
+ &origin,
+ new_deposit,
+ )?;
+ stream.deposit = stream
+ .deposit
+ .checked_add(&new_deposit)
+ .ok_or(Error::::CurrencyOverflow)?;
+
+ // Update stream info in storage.
+ Streams::::insert(stream_id, stream);
+
+ Ok(().into())
}
#[pallet::call_index(4)]
pub fn change_stream_rate(
- _origin: OriginFor,
- _stream_id: T::StreamId,
- _new_rate_per_time_unit: T::Balance,
+ origin: OriginFor,
+ stream_id: T::StreamId,
+ new_rate_per_time_unit: T::Balance,
) -> DispatchResultWithPostInfo {
- todo!()
+ let origin = ensure_signed(origin)?;
+ let mut stream = Streams::::get(stream_id).ok_or(Error::::UnknownStreamId)?;
+
+ // Only source or target can update the rate.
+ ensure!(
+ origin == stream.source || origin == stream.target,
+ Error::::UnauthorizedOrigin
+ );
+
+ // Noop
+ if new_rate_per_time_unit == stream.rate_per_time_unit {
+ return Ok(().into());
+ }
+
+ // Ensure rate change is fair.
+ if origin == stream.source && new_rate_per_time_unit < stream.rate_per_time_unit {
+ return Err(Error::::SourceCantDecreaseRate.into());
+ }
+
+ if origin == stream.target && new_rate_per_time_unit > stream.rate_per_time_unit {
+ return Err(Error::::TargetCantIncreaseRate.into());
+ }
+
+ // Perform pending payment before changing rate.
+ Self::perform_stream_payment(stream_id, &mut stream)?;
+
+ // Emit event.
+ Pallet::::deposit_event(Event::::StreamRateChanged {
+ stream_id,
+ old_rate: stream.rate_per_time_unit,
+ new_rate: new_rate_per_time_unit,
+ });
+
+ // Update rate
+ stream.rate_per_time_unit = new_rate_per_time_unit;
+ Streams::::insert(stream_id, stream);
+
+ Ok(().into())
}
}
From 594ca74dadf5013becb1373da909e8e73445f36e Mon Sep 17 00:00:00 2001
From: nanocryk <6422796+nanocryk@users.noreply.github.com>
Date: Mon, 8 Jan 2024 15:04:32 +0100
Subject: [PATCH 05/40] adapter for fungible impl
---
Cargo.toml | 1 +
pallets/stream-payment/src/as_fungibles.rs | 388 +++++++++++++++++++++
pallets/stream-payment/src/lib.rs | 2 +
3 files changed, 391 insertions(+)
create mode 100644 pallets/stream-payment/src/as_fungibles.rs
diff --git a/Cargo.toml b/Cargo.toml
index a78147705..349875964 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,6 +47,7 @@ tc-consensus = { path = "client/consensus" }
tp-author-noting-inherent = { path = "primitives/author-noting-inherent", default-features = false }
tp-consensus = { path = "primitives/consensus", default-features = false }
tp-container-chain-genesis-data = { path = "primitives/container-chain-genesis-data", default-features = false }
+tp-fungibles-ext = { path = "primitives/fungibles-ext", default-features = false }
tp-maths = { path = "primitives/maths", default-features = false }
tp-traits = { path = "primitives/traits", default-features = false }
diff --git a/pallets/stream-payment/src/as_fungibles.rs b/pallets/stream-payment/src/as_fungibles.rs
new file mode 100644
index 000000000..5a30e9799
--- /dev/null
+++ b/pallets/stream-payment/src/as_fungibles.rs
@@ -0,0 +1,388 @@
+// Copyright (C) Moondance Labs Ltd.
+// This file is part of Tanssi.
+
+// Tanssi is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Tanssi is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Tanssi. If not, see .
+
+//! Provides a wrapper to adapat a `fungible` into `fungibles`.
+
+// TODO: PR Substrate to include this convertion, which is the opposite of the
+// currently present `ItemOf`. Lacking some redirects due to unaccessible
+// imbalances type conversions.
+// https://github.com/paritytech/polkadot-sdk/pull/2858
+
+use {
+ core::marker::PhantomData,
+ frame_support::{
+ sp_runtime::{DispatchError, DispatchResult},
+ traits::tokens::{
+ fungible, fungibles, DepositConsequence, Fortitude, Precision, Preservation,
+ Provenance, Restriction, WithdrawConsequence,
+ },
+ },
+};
+
+/// Redirects `fungibles` function to the `fungible` equivalent without
+/// the AssetId argument.
+macro_rules! redirect {
+ ( $(
+ fn $fn_name:ident (
+ $(
+ $arg_name:ident : $arg_ty:ty
+ ),* $(,)?
+ ) $(-> $fn_out:ty)?;
+ )+) => {
+ $(
+ fn $fn_name((): Self::AssetId, $($arg_name:$arg_ty),*) $(-> $fn_out)? {
+ F::$fn_name($($arg_name),*)
+ }
+ )+
+ };
+}
+
+pub struct ConvertHandleImbalanceDrop(PhantomData);
+
+impl> fungibles::HandleImbalanceDrop<(), B>
+ for ConvertHandleImbalanceDrop
+{
+ fn handle((): (), amount: B) {
+ H::handle(amount)
+ }
+}
+
+/// A wrapper to use a `fungible` as a `fungibles` with a single asset represented by `()`.
+pub struct AsFungibles(PhantomData<(F, AccountId)>);
+
+impl> fungibles::Inspect
+ for AsFungibles
+{
+ type AssetId = ();
+ type Balance = F::Balance;
+
+ redirect!(
+ fn total_issuance() -> Self::Balance;
+ fn minimum_balance() -> Self::Balance;
+ fn total_balance(who: &AccountId) -> Self::Balance;
+ fn balance(who: &AccountId) -> Self::Balance;
+ fn reducible_balance(
+ who: &AccountId,
+ preservation: Preservation,
+ force: Fortitude,
+ ) -> Self::Balance;
+ fn can_deposit(
+ who: &AccountId,
+ amount: Self::Balance,
+ provenance: Provenance,
+ ) -> DepositConsequence;
+ fn can_withdraw(
+ who: &AccountId,
+ amount: Self::Balance,
+ ) -> WithdrawConsequence;
+ fn active_issuance() -> Self::Balance;
+ );
+
+ fn asset_exists((): Self::AssetId) -> bool {
+ true
+ }
+}
+
+impl> fungibles::Unbalanced
+ for AsFungibles
+{
+ redirect!(
+ fn write_balance(
+ who: &AccountId,
+ amount: Self::Balance,
+ ) -> Result