Skip to content

Commit

Permalink
#345 get rid of unwraps in commit monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
joepio committed Apr 9, 2022
1 parent 5f2dc5f commit 5a52f9b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ By far most changes relate to `atomic-server`, so if not specified, assume the c
- Added `reset` and `show-config` commands to `atomic-server`.
- Added `data-dir` flag
- Replaced `awc` with `ureq` #374
- Get rid of `.unwrap` calls in `commit_monitor` #345

## [v0.31.1] - 2022-03-29

Expand Down
54 changes: 33 additions & 21 deletions server/src/commit_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::{
actor_messages::{CommitMessage, Subscribe},
config::Config,
errors::AtomicServerResult,
handlers::web_sockets::WebSocketConnection,
search::SearchState,
};
Expand Down Expand Up @@ -78,19 +79,11 @@ impl Handler<Subscribe> for CommitMonitor {
}
}

impl Handler<CommitMessage> for CommitMonitor {
type Result = ();

impl CommitMonitor {
/// When a commit comes in, send it to any listening subscribers,
/// and update the value index.
/// The search index is only updated if the last search commit is 15 seconds or older.
// This has a bunch of .unwrap() / panics, which is not ideal.
// However, I don't want to make this a blocking call,
// I want commits to succeed (no 500 response) even if indexing fails,
// also because performance is important here -
// dealing with these indexing things synchronously would be too slow.
#[tracing::instrument(name = "handle_commit_message", skip_all, fields(subscriptions = &self.subscriptions.len(), s = %msg.commit_response.commit_resource.get_subject()))]
fn handle(&mut self, msg: CommitMessage, _: &mut Context<Self>) {
fn handle_internal(&mut self, msg: CommitMessage) -> AtomicServerResult<()> {
let target = msg.commit_response.commit_struct.subject.clone();

// Notify websocket listeners
Expand All @@ -108,25 +101,25 @@ impl Handler<CommitMessage> for CommitMonitor {
}

// Update the value index
msg.commit_response
.commit_struct
.apply_changes(msg.commit_response.resource_old.clone(), &self.store, true)
.unwrap();
msg.commit_response.commit_struct.apply_changes(
msg.commit_response.resource_old.clone(),
&self.store,
true,
)?;

// Add commit itself to the value index
for atom in msg.commit_response.commit_resource.to_atoms().unwrap() {
for atom in msg.commit_response.commit_resource.to_atoms()? {
self.store
.add_atom_to_index(&atom, &msg.commit_response.commit_resource)
.unwrap();
.add_atom_to_index(&atom, &msg.commit_response.commit_resource)?;
}

// Update the search index
if let Some(resource) = &msg.commit_response.resource_new {
if self.config.opts.remove_previous_search {
crate::search::remove_resource(&self.search_state, &target).unwrap();
crate::search::remove_resource(&self.search_state, &target)?;
};
// Add new resource to search index
crate::search::add_resource(&self.search_state, resource).unwrap();
crate::search::add_resource(&self.search_state, resource)?;

// TODO: This is not ideal, as it does not _delay_ the search index update, but it prevents it.
// Current implementation should work just fine in most scenario's.
Expand All @@ -136,12 +129,31 @@ impl Handler<CommitMessage> for CommitMonitor {
if since_last_commit > commit_duration {
// This is a slow operation!
// Commit the changset to the search index.
self.search_state.writer.write().unwrap().commit().unwrap();
self.search_state.writer.write()?.commit()?;
self.last_search_commit = now;
}
} else {
// If there is no new resource, it must have been deleted, so let's remove it from the search index.
crate::search::remove_resource(&self.search_state, &target).unwrap();
crate::search::remove_resource(&self.search_state, &target)?;
}
Ok(())
}
}

impl Handler<CommitMessage> for CommitMonitor {
type Result = ();

#[tracing::instrument(name = "handle_commit_message", skip_all, fields(subscriptions = &self.subscriptions.len(), s = %msg.commit_response.commit_resource.get_subject()))]
fn handle(&mut self, msg: CommitMessage, _: &mut Context<Self>) {
// We have moved the logic to the `handle_internal` function for decent error handling
match self.handle_internal(msg) {
Ok(_) => {}
Err(e) => {
tracing::error!(
"Handling commit in CommitMonitor failed, cache may not be fully updated: {}",
e
);
}
}
}
}
Expand Down

0 comments on commit 5a52f9b

Please sign in to comment.