Skip to content

Commit

Permalink
removes CowCachedExecutors (backport #22343) (#22362)
Browse files Browse the repository at this point in the history
* removes CowCachedExecutors (#22343)

Copy-on-write semantics for cached executors can be implemented by a
simple Arc<CachedExecutors> as opposed to CowCachedExecutors:
https://github.com/solana-labs/solana/blob/f1e2598ba/runtime/src/bank.rs#L244-L247

This will also avoid the need for double locking as in:
https://github.com/solana-labs/solana/blob/f1e2598ba/runtime/src/bank.rs#L3490-L3491
https://github.com/solana-labs/solana/blob/f1e2598ba/runtime/src/bank.rs#L3525-L3526

(cherry picked from commit c2389fc)

# Conflicts:
#	runtime/src/bank.rs

* removes backport merge conflicts

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored Jan 7, 2022
1 parent b30c726 commit 299a59f
Showing 1 changed file with 53 additions and 98 deletions.
151 changes: 53 additions & 98 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,51 +281,6 @@ impl fmt::Debug for Builtin {
}
}

/// Copy-on-write holder of CachedExecutors
#[derive(AbiExample, Debug, Default)]
struct CowCachedExecutors {
shared: bool,
executors: Arc<RwLock<CachedExecutors>>,
}
impl Clone for CowCachedExecutors {
fn clone(&self) -> Self {
Self {
shared: true,
executors: self.executors.clone(),
}
}
}
impl CowCachedExecutors {
fn clone_with_epoch(&self, epoch: u64) -> Self {
let executors_raw = self.read().unwrap();
if executors_raw.current_epoch() == epoch {
self.clone()
} else {
Self {
shared: false,
executors: Arc::new(RwLock::new(executors_raw.clone_with_epoch(epoch))),
}
}
}
fn new(executors: Arc<RwLock<CachedExecutors>>) -> Self {
Self {
shared: true,
executors,
}
}
fn read(&self) -> LockResult<RwLockReadGuard<CachedExecutors>> {
self.executors.read()
}
fn write(&mut self) -> LockResult<RwLockWriteGuard<CachedExecutors>> {
if self.shared {
self.shared = false;
let local_cache = (*self.executors.read().unwrap()).clone();
self.executors = Arc::new(RwLock::new(local_cache));
}
self.executors.write()
}
}

#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl AbiExample for Builtin {
fn example() -> Self {
Expand Down Expand Up @@ -364,8 +319,8 @@ impl Default for CachedExecutors {
fn default() -> Self {
Self {
max: MAX_CACHED_EXECUTORS,
current_epoch: 0,
executors: HashMap::new(),
current_epoch: Epoch::default(),
executors: HashMap::default(),
}
}
}
Expand All @@ -382,44 +337,42 @@ impl AbiExample for CachedExecutors {

impl Clone for CachedExecutors {
fn clone(&self) -> Self {
self.clone_with_epoch(self.current_epoch)
let executors = self.executors.iter().map(|(&key, entry)| {
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.prev_epoch_count,
epoch_count: AtomicU64::new(entry.epoch_count.load(Relaxed)),
executor: entry.executor.clone(),
};
(key, entry)
});
Self {
max: self.max,
current_epoch: self.current_epoch,
executors: executors.collect(),
}
}
}
impl CachedExecutors {
fn current_epoch(&self) -> Epoch {
self.current_epoch
}

fn clone_with_epoch(&self, epoch: Epoch) -> Self {
let mut executors = HashMap::new();
for (key, entry) in self.executors.iter() {
impl CachedExecutors {
fn clone_with_epoch(self: &Arc<Self>, epoch: Epoch) -> Arc<Self> {
if self.current_epoch == epoch {
return self.clone();
}
let executors = self.executors.iter().map(|(&key, entry)| {
// The total_count = prev_epoch_count + epoch_count will be used for LFU eviction.
// If the epoch has changed, we store the prev_epoch_count and reset the epoch_count to 0.
if epoch > self.current_epoch {
executors.insert(
*key,
CachedExecutorsEntry {
prev_epoch_count: entry.epoch_count.load(Relaxed),
epoch_count: AtomicU64::new(0),
executor: entry.executor.clone(),
},
);
} else {
executors.insert(
*key,
CachedExecutorsEntry {
prev_epoch_count: entry.prev_epoch_count,
epoch_count: AtomicU64::new(entry.epoch_count.load(Relaxed)),
executor: entry.executor.clone(),
},
);
}
}
Self {
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.epoch_count.load(Relaxed),
epoch_count: AtomicU64::default(),
executor: entry.executor.clone(),
};
(key, entry)
});
Arc::new(Self {
max: self.max,
current_epoch: epoch,
executors,
}
executors: executors.collect(),
})
}

fn new(max: usize, current_epoch: Epoch) -> Self {
Expand Down Expand Up @@ -1082,7 +1035,9 @@ pub struct Bank {
pub rewards_pool_pubkeys: Arc<HashSet<Pubkey>>,

/// Cached executors
cached_executors: RwLock<CowCachedExecutors>,
// Inner Arc is meant to implement copy-on-write semantics as opposed to
// sharing mutations (hence RwLock<Arc<...>> instead of Arc<RwLock<...>>).
cached_executors: RwLock<Arc<CachedExecutors>>,

transaction_debug_keys: Option<Arc<HashSet<Pubkey>>>,

Expand Down Expand Up @@ -1355,9 +1310,10 @@ impl Bank {
cluster_type: parent.cluster_type,
lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)),
rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(),
cached_executors: RwLock::new(
(*parent.cached_executors.read().unwrap()).clone_with_epoch(epoch),
),
cached_executors: {
let cached_executors = parent.cached_executors.read().unwrap();
RwLock::new(cached_executors.clone_with_epoch(epoch))
},
transaction_debug_keys: parent.transaction_debug_keys.clone(),
transaction_log_collector_config: parent.transaction_log_collector_config.clone(),
transaction_log_collector: Arc::new(RwLock::new(TransactionLogCollector::default())),
Expand Down Expand Up @@ -1529,9 +1485,10 @@ impl Bank {
cluster_type: Some(genesis_config.cluster_type),
lazy_rent_collection: new(),
rewards_pool_pubkeys: new(),
cached_executors: RwLock::new(CowCachedExecutors::new(Arc::new(RwLock::new(
CachedExecutors::new(MAX_CACHED_EXECUTORS, fields.epoch),
)))),
cached_executors: RwLock::new(Arc::new(CachedExecutors::new(
MAX_CACHED_EXECUTORS,
fields.epoch,
))),
transaction_debug_keys: debug_keys,
transaction_log_collector_config: new(),
transaction_log_collector: new(),
Expand Down Expand Up @@ -3331,8 +3288,7 @@ impl Bank {
num_executors += instruction_loaders.len();
}
let mut executors = HashMap::with_capacity(num_executors);
let cow_cache = self.cached_executors.read().unwrap();
let cache = cow_cache.read().unwrap();
let cache = self.cached_executors.read().unwrap();

for key in message.account_keys.iter() {
if let Some(executor) = cache.get(key) {
Expand Down Expand Up @@ -3365,19 +3321,18 @@ impl Bank {
.collect();

if !dirty_executors.is_empty() {
let mut cow_cache = self.cached_executors.write().unwrap();
let mut cache = cow_cache.write().unwrap();
let mut cache = self.cached_executors.write().unwrap();
let cache = Arc::make_mut(&mut cache);
for (key, executor) in dirty_executors.into_iter() {
cache.put(key, executor);
}
}
}

/// Remove an executor from the bank's cache
pub fn remove_executor(&self, pubkey: &Pubkey) {
let mut cow_cache = self.cached_executors.write().unwrap();
let mut cache = cow_cache.write().unwrap();
cache.remove(pubkey);
fn remove_executor(&self, pubkey: &Pubkey) {
let mut cache = self.cached_executors.write().unwrap();
Arc::make_mut(&mut cache).remove(pubkey);
}

#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -11910,26 +11865,26 @@ pub(crate) mod tests {
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());

cache = cache.clone_with_epoch(1);
let mut cache = Arc::new(cache).clone_with_epoch(1);
assert!(cache.current_epoch == 1);

assert!(cache.get(&key2).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());
cache.put(&key4, executor.clone());
Arc::make_mut(&mut cache).put(&key4, executor.clone());

assert!(cache.get(&key4).is_some());
assert!(cache.get(&key3).is_none());

cache.put(&key1, executor.clone());
cache.put(&key3, executor.clone());
Arc::make_mut(&mut cache).put(&key1, executor.clone());
Arc::make_mut(&mut cache).put(&key3, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key4).is_none());

cache = cache.clone_with_epoch(2);
assert!(cache.current_epoch == 2);

cache.put(&key3, executor.clone());
Arc::make_mut(&mut cache).put(&key3, executor.clone());
assert!(cache.get(&key3).is_some());
}

Expand Down

0 comments on commit 299a59f

Please sign in to comment.