Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: implement key expiration #13

Merged
merged 2 commits into from
Mar 25, 2020
Merged

server: implement key expiration #13

merged 2 commits into from
Mar 25, 2020

Conversation

carllerche
Copy link
Member

Also renames Kv -> Db as it covers more than just pure "key-value" (subscriptions).

Comment on lines +73 to +115
pub(crate) fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
let mut state = self.shared.state.lock().unwrap();

// Get and increment the next insertion ID.
let id = state.next_id;
state.next_id += 1;

// By default, no notification is needed
let mut notify = false;

let expires_at = expire.map(|duration| {
let when = Instant::now() + duration;

// Only notify the worker task if the newly inserted expiration is the
// **next** key to evict. In this case, the worker needs to be woken up
// to update its state.
notify = state.next_expiration()
.map(|expiration| expiration > when)
.unwrap_or(true);

state.expirations.insert((when, id), key.clone());
when
});

// Insert the entry.
let prev = state.entries.insert(key, Entry {
id,
data: value,
expires_at,
});

if let Some(prev) = prev {
if let Some(when) = prev.expires_at {
// clear expiration
state.expirations.remove(&(when, prev.id));
}
}

drop(state);

if notify {
self.shared.expire_task.notify();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the value, tracking expiration

Comment on lines +144 to +194
impl Shared {
fn purge_expired_keys(&self) -> Option<Instant> {
let mut state = self.state.lock().unwrap();

// This is needed to make the borrow checker happy. In short, `lock()`
// returns a `MutexGuard` and not a `&mut State`. The borrow checker is
// not able to see "through" the mutex guard and determine that it is
// safe to access both `state.expirations` and `state.entries` mutably,
// so we get a "real" mutable reference to `State` outside of the loop.
let state = &mut *state;

// Find all keys scheduled to expire **before** now.
let now = Instant::now();

while let Some((&(when, id), key)) = state.expirations.iter().next() {
if when > now {
// Done purging, `when` is the instant at which the next key
// expires. The worker task will wait until this instant.
return Some(when);
}

// The key expired, remove it
state.entries.remove(key);
state.expirations.remove(&(when, id));
}

None
}
}

impl State {
fn next_expiration(&self) -> Option<Instant> {
self.expirations.keys().next().map(|expiration| expiration.0)
}
}

async fn purge_expired_tasks(shared: Arc<Shared>) {
loop {
// Purge all keys that are expired. The function returns the instant at
// which the **next** key will expire. The worker should wait until the
// instant has passed then purge again.
if let Some(when) = shared.purge_expired_keys() {
tokio::select! {
_ = time::delay_until(when) => {}
_ = shared.expire_task.notified() => {}
}
} else {
shared.expire_task.notified().await;
}
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purging expired keys

@LucioFranco
Copy link
Member

I like it!

src/parse.rs Outdated
Comment on lines 60 to 61
Frame::Integer(v) => Ok(v),
Frame::Simple(s) => atoi::<u64>(s.as_bytes()).ok_or(ParseError::Invalid),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth having more descriptive variable names here instead of v and s. data is probably fine for all of them?

src/db.rs Outdated
data: Bytes,

/// Instant at which the entry expires and should be removed from the
/// databaase.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
/// databaase.
/// database.

pub(crate) fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes> {
use std::collections::hash_map::Entry;

let mut state = self.shared.state.lock().unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this error be handled / can this be an .expect so readers know what went wrong?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could... it shouldn't happen in practice (mutex poisoning). I might punt this to a "polish" pass.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I think your point of better handling poisoned mutexes (or at least having comments) is good. We had the unwrap before and I don't know what the best answer is yet, so i am punting for this PR. I can open an issue to track.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#14

src/db.rs Outdated
Comment on lines 139 to 140
.map(|tx| tx.send(value).unwrap_or(0))
.unwrap_or(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps worth a comment noting what this is doing

@carllerche carllerche merged commit fc5597f into master Mar 25, 2020
@carllerche carllerche deleted the purge-expired-keys branch April 20, 2020 19:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants