-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server: implement key expiration #13
Conversation
pub(crate) fn set(&self, key: String, value: Bytes, expire: Option<Duration>) { | ||
let mut state = self.shared.state.lock().unwrap(); | ||
|
||
// Get and increment the next insertion ID. | ||
let id = state.next_id; | ||
state.next_id += 1; | ||
|
||
// By default, no notification is needed | ||
let mut notify = false; | ||
|
||
let expires_at = expire.map(|duration| { | ||
let when = Instant::now() + duration; | ||
|
||
// Only notify the worker task if the newly inserted expiration is the | ||
// **next** key to evict. In this case, the worker needs to be woken up | ||
// to update its state. | ||
notify = state.next_expiration() | ||
.map(|expiration| expiration > when) | ||
.unwrap_or(true); | ||
|
||
state.expirations.insert((when, id), key.clone()); | ||
when | ||
}); | ||
|
||
// Insert the entry. | ||
let prev = state.entries.insert(key, Entry { | ||
id, | ||
data: value, | ||
expires_at, | ||
}); | ||
|
||
if let Some(prev) = prev { | ||
if let Some(when) = prev.expires_at { | ||
// clear expiration | ||
state.expirations.remove(&(when, prev.id)); | ||
} | ||
} | ||
|
||
drop(state); | ||
|
||
if notify { | ||
self.shared.expire_task.notify(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the value, tracking expiration
impl Shared { | ||
fn purge_expired_keys(&self) -> Option<Instant> { | ||
let mut state = self.state.lock().unwrap(); | ||
|
||
// This is needed to make the borrow checker happy. In short, `lock()` | ||
// returns a `MutexGuard` and not a `&mut State`. The borrow checker is | ||
// not able to see "through" the mutex guard and determine that it is | ||
// safe to access both `state.expirations` and `state.entries` mutably, | ||
// so we get a "real" mutable reference to `State` outside of the loop. | ||
let state = &mut *state; | ||
|
||
// Find all keys scheduled to expire **before** now. | ||
let now = Instant::now(); | ||
|
||
while let Some((&(when, id), key)) = state.expirations.iter().next() { | ||
if when > now { | ||
// Done purging, `when` is the instant at which the next key | ||
// expires. The worker task will wait until this instant. | ||
return Some(when); | ||
} | ||
|
||
// The key expired, remove it | ||
state.entries.remove(key); | ||
state.expirations.remove(&(when, id)); | ||
} | ||
|
||
None | ||
} | ||
} | ||
|
||
impl State { | ||
fn next_expiration(&self) -> Option<Instant> { | ||
self.expirations.keys().next().map(|expiration| expiration.0) | ||
} | ||
} | ||
|
||
async fn purge_expired_tasks(shared: Arc<Shared>) { | ||
loop { | ||
// Purge all keys that are expired. The function returns the instant at | ||
// which the **next** key will expire. The worker should wait until the | ||
// instant has passed then purge again. | ||
if let Some(when) = shared.purge_expired_keys() { | ||
tokio::select! { | ||
_ = time::delay_until(when) => {} | ||
_ = shared.expire_task.notified() => {} | ||
} | ||
} else { | ||
shared.expire_task.notified().await; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Purging expired keys
I like it! |
src/parse.rs
Outdated
Frame::Integer(v) => Ok(v), | ||
Frame::Simple(s) => atoi::<u64>(s.as_bytes()).ok_or(ParseError::Invalid), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth having more descriptive variable names here instead of v
and s
. data
is probably fine for all of them?
src/db.rs
Outdated
data: Bytes, | ||
|
||
/// Instant at which the entry expires and should be removed from the | ||
/// databaase. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
/// databaase. | |
/// database. |
pub(crate) fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes> { | ||
use std::collections::hash_map::Entry; | ||
|
||
let mut state = self.shared.state.lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this error be handled / can this be an .expect
so readers know what went wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could... it shouldn't happen in practice (mutex poisoning). I might punt this to a "polish" pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, I think your point of better handling poisoned mutexes (or at least having comments) is good. We had the unwrap
before and I don't know what the best answer is yet, so i am punting for this PR. I can open an issue to track.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/db.rs
Outdated
.map(|tx| tx.send(value).unwrap_or(0)) | ||
.unwrap_or(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps worth a comment noting what this is doing
Also renames
Kv
->Db
as it covers more than just pure "key-value" (subscriptions).