-
Notifications
You must be signed in to change notification settings - Fork 707
/
lib.rs
288 lines (242 loc) · 9.5 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! This pallet used to implement a message queue for downward messages from the relay-chain.
//!
//! It is now deprecated and has been refactored to simply drain any remaining messages into
//! something implementing `HandleMessage`. It proceeds in the state of
//! [`MigrationState`] one by one by their listing in the source code. The pallet can be removed
//! from the runtime once `Completed` was emitted.
#![cfg_attr(not(feature = "std"), no_std)]
#![allow(deprecated)] // The pallet itself is deprecated.
use migration::*;
pub use pallet::*;
mod benchmarking;
mod migration;
mod mock;
mod tests;
pub mod weights;
pub use weights::WeightInfo;
/// The maximal length of a DMP message.
pub type MaxDmpMessageLenOf<T> =
<<T as Config>::DmpSink as frame_support::traits::HandleMessage>::MaxMessageLen;
#[frame_support::pallet]
#[deprecated(
note = "`cumulus-pallet-dmp-queue` will be removed after November 2024. It can be removed once its lazy migration completed. See <https://github.com/paritytech/polkadot-sdk/pull/1246>."
)]
pub mod pallet {
use super::*;
use frame_support::{pallet_prelude::*, traits::HandleMessage, weights::WeightMeter};
use frame_system::pallet_prelude::*;
use sp_io::hashing::twox_128;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(2);
#[pallet::pallet]
#[pallet::storage_version(STORAGE_VERSION)]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config {
/// The overarching event type of the runtime.
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
/// The sink for all DMP messages that the lazy migration will use.
type DmpSink: HandleMessage;
/// Weight info for this pallet (only needed for the lazy migration).
type WeightInfo: WeightInfo;
}
/// The migration state of this pallet.
#[pallet::storage]
pub type MigrationStatus<T> = StorageValue<_, MigrationState, ValueQuery>;
/// The lazy-migration state of the pallet.
#[derive(
codec::Encode, codec::Decode, Debug, PartialEq, Eq, Clone, MaxEncodedLen, TypeInfo,
)]
pub enum MigrationState {
/// Migration has not started yet.
NotStarted,
/// The export of pages started.
StartedExport {
/// The next page that should be exported.
next_begin_used: PageCounter,
},
/// The page export completed.
CompletedExport,
/// The export of overweight messages started.
StartedOverweightExport {
/// The next overweight index that should be exported.
next_overweight_index: u64,
},
/// The export of overweight messages completed.
CompletedOverweightExport,
/// The storage cleanup started.
StartedCleanup { cursor: Option<BoundedVec<u8, ConstU32<1024>>> },
/// The migration finished. The pallet can now be removed from the runtime.
Completed,
}
impl Default for MigrationState {
fn default() -> Self {
Self::NotStarted
}
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
/// The export of pages started.
StartedExport,
/// The export of a page completed.
Exported { page: PageCounter },
/// The export of a page failed.
///
/// This should never be emitted.
ExportFailed { page: PageCounter },
/// The export of pages completed.
CompletedExport,
/// The export of overweight messages started.
StartedOverweightExport,
/// The export of an overweight message completed.
ExportedOverweight { index: OverweightIndex },
/// The export of an overweight message failed.
///
/// This should never be emitted.
ExportOverweightFailed { index: OverweightIndex },
/// The export of overweight messages completed.
CompletedOverweightExport,
/// The cleanup of remaining pallet storage started.
StartedCleanup,
/// Some debris was cleaned up.
CleanedSome { keys_removed: u32 },
/// The cleanup of remaining pallet storage completed.
Completed { error: bool },
}
#[pallet::call]
impl<T: Config> Pallet<T> {}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn integrity_test() {
let w = Self::on_idle_weight();
assert!(w != Weight::zero());
assert!(w.all_lte(T::BlockWeights::get().max_block));
}
fn on_idle(now: BlockNumberFor<T>, limit: Weight) -> Weight {
let mut meter = WeightMeter::with_limit(limit);
if meter.try_consume(Self::on_idle_weight()).is_err() {
log::debug!(target: LOG, "Not enough weight for on_idle. {} < {}", Self::on_idle_weight(), limit);
return meter.consumed()
}
let state = MigrationStatus::<T>::get();
let index = PageIndex::<T>::get();
log::debug!(target: LOG, "on_idle: block={:?}, state={:?}, index={:?}", now, state, index);
match state {
MigrationState::NotStarted => {
log::debug!(target: LOG, "Init export at page {}", index.begin_used);
MigrationStatus::<T>::put(MigrationState::StartedExport {
next_begin_used: index.begin_used,
});
Self::deposit_event(Event::StartedExport);
},
MigrationState::StartedExport { next_begin_used } => {
log::debug!(target: LOG, "Exporting page {}", next_begin_used);
if next_begin_used == index.end_used {
MigrationStatus::<T>::put(MigrationState::CompletedExport);
log::debug!(target: LOG, "CompletedExport");
Self::deposit_event(Event::CompletedExport);
} else {
let res = migration::migrate_page::<T>(next_begin_used);
MigrationStatus::<T>::put(MigrationState::StartedExport {
next_begin_used: next_begin_used.saturating_add(1),
});
if let Ok(()) = res {
log::debug!(target: LOG, "Exported page {}", next_begin_used);
Self::deposit_event(Event::Exported { page: next_begin_used });
} else {
Self::deposit_event(Event::ExportFailed { page: next_begin_used });
}
}
},
MigrationState::CompletedExport => {
log::debug!(target: LOG, "Init export overweight at index 0");
MigrationStatus::<T>::put(MigrationState::StartedOverweightExport {
next_overweight_index: 0,
});
Self::deposit_event(Event::StartedOverweightExport);
},
MigrationState::StartedOverweightExport { next_overweight_index } => {
log::debug!(target: LOG, "Exporting overweight index {}", next_overweight_index);
if next_overweight_index == index.overweight_count {
MigrationStatus::<T>::put(MigrationState::CompletedOverweightExport);
log::debug!(target: LOG, "CompletedOverweightExport");
Self::deposit_event(Event::CompletedOverweightExport);
} else {
let res = migration::migrate_overweight::<T>(next_overweight_index);
MigrationStatus::<T>::put(MigrationState::StartedOverweightExport {
next_overweight_index: next_overweight_index.saturating_add(1),
});
if let Ok(()) = res {
log::debug!(target: LOG, "Exported overweight index {next_overweight_index}");
Self::deposit_event(Event::ExportedOverweight {
index: next_overweight_index,
});
} else {
Self::deposit_event(Event::ExportOverweightFailed {
index: next_overweight_index,
});
}
}
},
MigrationState::CompletedOverweightExport => {
log::debug!(target: LOG, "Init cleanup");
MigrationStatus::<T>::put(MigrationState::StartedCleanup { cursor: None });
Self::deposit_event(Event::StartedCleanup);
},
MigrationState::StartedCleanup { cursor } => {
log::debug!(target: LOG, "Cleaning up");
let hashed_prefix =
twox_128(<Pallet<T> as PalletInfoAccess>::name().as_bytes());
let result = frame_support::storage::unhashed::clear_prefix(
&hashed_prefix,
Some(2), // Somehow it does nothing when set to 1, so we set it to 2.
cursor.as_ref().map(|c| c.as_ref()),
);
Self::deposit_event(Event::CleanedSome { keys_removed: result.backend });
// GOTCHA! We deleted *all* pallet storage; hence we also our own
// `MigrationState`. BUT we insert it back:
if let Some(unbound_cursor) = result.maybe_cursor {
if let Ok(cursor) = unbound_cursor.try_into() {
log::debug!(target: LOG, "Next cursor: {:?}", &cursor);
MigrationStatus::<T>::put(MigrationState::StartedCleanup {
cursor: Some(cursor),
});
} else {
MigrationStatus::<T>::put(MigrationState::Completed);
log::error!(target: LOG, "Completed with error: could not bound cursor");
Self::deposit_event(Event::Completed { error: true });
}
} else {
MigrationStatus::<T>::put(MigrationState::Completed);
log::debug!(target: LOG, "Completed");
Self::deposit_event(Event::Completed { error: false });
}
},
MigrationState::Completed => {
log::debug!(target: LOG, "Idle; you can remove this pallet");
},
}
meter.consumed()
}
}
impl<T: Config> Pallet<T> {
/// The worst-case weight of [`Self::on_idle`].
pub fn on_idle_weight() -> Weight {
<T as crate::Config>::WeightInfo::on_idle_good_msg()
.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
}
}
}