Skip to content

Commit

Permalink
chore: cargo fmt (1.31.0)
Browse files Browse the repository at this point in the history
tidies up the overhanging chained methods, so also revert:
mozilla-services/autopush@fd48405
  • Loading branch information
pjenvey committed Dec 12, 2018
1 parent 3dde821 commit 801b1e1
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 125 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ install:
else
cargo build || travis_terminate 1
fi
rustup component add rustfmt-preview
script:
- cargo fmt -- --check
- py.test -v
- cargo test
notifications:
Expand Down
56 changes: 39 additions & 17 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,21 @@ where
id: Some(webpush.uaid.to_simple().to_string()),
..Default::default()
});
event.tags.insert("ua_name".to_string(), ua_result.name.to_string());
event.tags.insert("ua_os_family".to_string(), metrics_os.to_string());
event.tags.insert("ua_os_ver".to_string(), ua_result.os_version.to_string());
event.tags.insert("ua_browser_family".to_string(), metrics_browser.to_string());
event.tags.insert("ua_browser_ver".to_string(), ua_result.version.to_string());
event
.tags
.insert("ua_name".to_string(), ua_result.name.to_string());
event
.tags
.insert("ua_os_family".to_string(), metrics_os.to_string());
event
.tags
.insert("ua_os_ver".to_string(), ua_result.os_version.to_string());
event
.tags
.insert("ua_browser_family".to_string(), metrics_browser.to_string());
event
.tags
.insert("ua_browser_ver".to_string(), ua_result.version.to_string());
sentry::capture_event(event);
err.display_chain().to_string()
} else {
Expand Down Expand Up @@ -629,14 +639,23 @@ where
data: AuthClientData<T>,
},

#[state_machine_future(
transitions(IncrementStorage, CheckStorage, AwaitDropUser, AwaitMigrateUser, AwaitInput)
)]
#[state_machine_future(transitions(
IncrementStorage,
CheckStorage,
AwaitDropUser,
AwaitMigrateUser,
AwaitInput
))]
DetermineAck { data: AuthClientData<T> },

#[state_machine_future(
transitions(DetermineAck, Send, AwaitInput, AwaitRegister, AwaitUnregister, AwaitDelete)
)]
#[state_machine_future(transitions(
DetermineAck,
Send,
AwaitInput,
AwaitRegister,
AwaitUnregister,
AwaitDelete
))]
AwaitInput { data: AuthClientData<T> },

#[state_machine_future(transitions(AwaitIncrementStorage))]
Expand Down Expand Up @@ -790,9 +809,11 @@ where
let webpush_rc = data.webpush.clone();
let mut webpush = webpush_rc.borrow_mut();
match input {
Either::A(ClientMessage::Hello { .. }) => {
Err(ErrorKind::InvalidStateTransition("AwaitInput".to_string(), "Hello".to_string()).into())
}
Either::A(ClientMessage::Hello { .. }) => Err(ErrorKind::InvalidStateTransition(
"AwaitInput".to_string(),
"Hello".to_string(),
)
.into()),
Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => {
let broadcast_delta = {
let mut broadcast_subs = data.broadcast_subs.borrow_mut();
Expand Down Expand Up @@ -855,14 +876,15 @@ where
}
Either::A(ClientMessage::Nack { code, .. }) => {
// only metric codes expected from the client (or 0)
let mcode =
code.and_then(|code| {
let mcode = code
.and_then(|code| {
if code >= 301 && code <= 303 {
Some(code)
} else {
None
}
}).unwrap_or(0);
})
.unwrap_or(0);
data.srv
.metrics
.incr_with_tags("ua.command.nack")
Expand Down
184 changes: 101 additions & 83 deletions src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,46 +72,47 @@ pub fn fetch_messages(
};

let metrics = Rc::clone(metrics);
let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(input.clone()), cond)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let mut notifs: Vec<DynamoDbNotification> =
output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
})
})
.collect()
});
if notifs.is_empty() {
return Ok(Default::default());
}

// Load the current_timestamp from the subscription registry entry which is
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
let messages = notifs
retry_if(
move || ddb.query(input.clone()),
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let mut notifs: Vec<DynamoDbNotification> = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
.into_iter()
.filter_map(|ddb_notif| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
})
})
.collect();
Ok(FetchMessageResponse {
timestamp,
messages,
.collect()
});
if notifs.is_empty() {
return Ok(Default::default());
}

// Load the current_timestamp from the subscription registry entry which is
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
let messages = notifs
.into_iter()
.filter_map(|ddb_notif| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
})
})
.collect();
Ok(FetchMessageResponse {
timestamp,
messages,
})
})
}

pub fn fetch_timestamp_messages(
Expand Down Expand Up @@ -141,34 +142,36 @@ pub fn fetch_timestamp_messages(
};

let metrics = Rc::clone(metrics);
let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(input.clone()), cond)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let messages = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
.into_iter()
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
})
retry_if(
move || ddb.query(input.clone()),
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let messages = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
.into_iter()
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
})
.filter_map(|ddb_notif: DynamoDbNotification| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
})
})
.filter_map(|ddb_notif: DynamoDbNotification| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
})
.collect()
});
let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Ok(FetchMessageResponse {
timestamp,
messages,
})
})
.collect()
});
let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Ok(FetchMessageResponse {
timestamp,
messages,
})
})
}

pub fn drop_user(
Expand All @@ -184,7 +187,8 @@ pub fn drop_user(
retry_if(
move || ddb.delete_item(input.clone()),
|err: &DeleteItemError| matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error dropping user")
)
.chain_err(|| "Error dropping user")
}

pub fn get_uaid(
Expand All @@ -201,7 +205,8 @@ pub fn get_uaid(
retry_if(
move || ddb.get_item(input.clone()),
|err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error fetching user")
)
.chain_err(|| "Error fetching user")
}

pub fn register_user(
Expand Down Expand Up @@ -233,14 +238,16 @@ pub fn register_user(
) and (
attribute_not_exists(node_id) or
(connected_at < :connected_at)
)"#.to_string(),
)"#
.to_string(),
),
return_values: Some("ALL_OLD".to_string()),
..Default::default()
})
},
|err: &PutItemError| matches!(err, &PutItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error storing user record")
)
.chain_err(|| "Error storing user record")
}

pub fn update_user_message_month(
Expand All @@ -264,9 +271,13 @@ pub fn update_user_message_month(
};

retry_if(
move || ddb.update_item(update_item.clone()).and_then(|_| future::ok(())),
move || {
ddb.update_item(update_item.clone())
.and_then(|_| future::ok(()))
},
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error updating user message month")
)
.chain_err(|| "Error updating user message month")
}

pub fn all_channels(
Expand All @@ -284,20 +295,22 @@ pub fn all_channels(
..Default::default()
};

let cond = |err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.get_item(input.clone()), cond)
.and_then(|output| {
let channels = output
.item
.and_then(|item| {
serde_dynamodb::from_hashmap::<DynamoDbNotification>(item)
.ok()
.and_then(|notif| notif.chids)
})
.unwrap_or_else(HashSet::new);
future::ok(channels)
})
.or_else(|_err| future::ok(HashSet::new()))
retry_if(
move || ddb.get_item(input.clone()),
|err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)),
)
.and_then(|output| {
let channels = output
.item
.and_then(|item| {
serde_dynamodb::from_hashmap::<DynamoDbNotification>(item)
.ok()
.and_then(|notif| notif.chids)
})
.unwrap_or_else(HashSet::new);
future::ok(channels)
})
.or_else(|_err| future::ok(HashSet::new()))
}

pub fn save_channels(
Expand All @@ -324,9 +337,13 @@ pub fn save_channels(
};

retry_if(
move || ddb.update_item(update_item.clone()).and_then(|_| future::ok(())),
move || {
ddb.update_item(update_item.clone())
.and_then(|_| future::ok(()))
},
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error saving channels")
)
.chain_err(|| "Error saving channels")
}

pub fn unregister_channel_id(
Expand All @@ -353,7 +370,8 @@ pub fn unregister_channel_id(
retry_if(
move || ddb.update_item(update_item.clone()),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error unregistering channel")
)
.chain_err(|| "Error unregistering channel")
}

pub fn lookup_user(
Expand Down
Loading

0 comments on commit 801b1e1

Please sign in to comment.