Skip to content

Commit

Permalink
Merge pull request #79 from mozilla-services/chore/78
Browse files Browse the repository at this point in the history
chore: upgrade dependencies
  • Loading branch information
bbangert authored Oct 5, 2018
2 parents bdc7b2b + 06eff39 commit 5f6fe05
Show file tree
Hide file tree
Showing 11 changed files with 1,075 additions and 666 deletions.
1,559 changes: 982 additions & 577 deletions Cargo.lock

Large diffs are not rendered by default.

66 changes: 31 additions & 35 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,40 @@ name = "autopush_rs"
path = "src/main.rs"

[dependencies]
base64 = "0.9.2"
bytes = "0.4.8"
cadence = "0.15.0"
chan-signal = "0.3.1"
chrono = "0.4.4"
docopt = "1.0.0"
env_logger = { version = "0.5.10", default-features = false }
base64 = "0.9.3"
bytes = "0.4.10"
cadence = "0.15.1"
chan-signal = "0.3.2"
chrono = "0.4.6"
config = "0.9.1"
docopt = "1.0.1"
env_logger = { version = "0.5.13", default-features = false }
error-chain = "0.12.0"
fernet = "0.1.0"
futures = "0.1.23"
futures = "0.1.24"
futures-backoff = "0.1.0"
hex = "0.3.2"
httparse = "1.3.2"
httparse = "1.3.3"
# XXX: pin to hyper 0.11 for now: 0.12 has many changes..
hyper = "0.11.27"
lazy_static = "1.0.1"
libc = "0.2.42"
log = { version = "0.4.3", features = ["max_level_info", "release_max_level_info"] }
matches = "0.1.6"
lazy_static = "1.1.0"
libc = "0.2.43"
log = { version = "0.4.5", features = ["max_level_info", "release_max_level_info"] }
matches = "0.1.8"
mozsvc-common = "0.1.0"
openssl = "0.10.10"
rand = "0.5.4"
regex = "1.0.1"
reqwest = { version = "0.8.6", features = ["unstable"] }
rusoto_core = "0.32.0"
rusoto_credential = "0.11.0"
rusoto_dynamodb = "0.32.0"
sentry = { version = "0.9.0", features = ["with_error_chain"] }
# XXX: pinned until server side's upgraded
serde = "1.0.70"
serde_derive = "1.0.70"
serde_dynamodb = "0.1.2"
serde_json = "1.0.22"
slog = { version = "2.2.3", features = ["max_level_trace", "release_max_level_info"] }
openssl = "0.10.12"
rand = "0.5.5"
regex = "1.0.5"
reqwest = "0.9.2"
rusoto_core = "0.34.0"
rusoto_credential = "0.13.0"
rusoto_dynamodb = "0.34.0"
sentry = { version = "0.12.0", features = ["with_error_chain"] }
serde = "1.0.79"
serde_derive = "1.0.79"
serde_dynamodb = { git = "https://github.com/mockersf/serde_dynamodb", rev = "ac0726dcc944aa25a107529d0f29c5be7dbfb436" }
serde_json = "1.0.31"
slog = { version = "2.4.0", features = ["max_level_trace", "release_max_level_info"] }
slog-async = "2.3.0"
slog-term = "2.4.0"
slog-mozlog-json = "0.1.0"
Expand All @@ -59,15 +59,11 @@ slog-stdlog = "3.0.2"
state_machine_future = "0.1.7"
time = "0.1.40"
tokio-core = "0.1.17"
tokio-io = "0.1.7"
tokio-io = "0.1.9"
tokio-openssl = "0.2.1"
tokio-service = "0.1.0"
tokio-tungstenite = { version = "0.5.1", default-features = false }
tungstenite = { version = "0.5.4", default-features = false }
uuid = { version = "0.6.5", features = ["serde", "v4"] }
tokio-tungstenite = { version = "0.6.0", default-features = false }
tungstenite = { version = "0.6.0", default-features = false }
uuid = { version = "0.7.1", features = ["serde", "v4"] }
# XXX: pin woothee until >= 0.8.1
woothee = "0.7.3"

[dependencies.config]
git = "https://github.com/mehcode/config-rs"
rev = "e8fa9fee96185ddd18ebcef8a925c75459111edb"
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
SHELL := /bin/sh
CARGO = cargo

.PHONY: ddb

ddb:
mkdir $@
curl -sSL http://dynamodb-local.s3-website-us-west-2.amazonaws.com/dynamodb_local_latest.tar.gz | tar xzvC $@

upgrade:
$(CARGO) install cargo-edit ||
echo "\n$(CARGO) install cargo-edit failed, continuing.."
$(CARGO) upgrade
$(CARGO) update
15 changes: 7 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::sync::mpsc;
use futures::sync::oneshot::Receiver;
use futures::AsyncSink;
use futures::{Async, Future, Poll, Sink, Stream};
use reqwest::unstable::async::Client as AsyncClient;
use reqwest::async::Client as AsyncClient;
use rusoto_dynamodb::UpdateItemOutput;
use sentry;
use sentry::integrations::error_chain::event_from_error_chain;
Expand Down Expand Up @@ -418,7 +418,7 @@ where
message_month,
connected_at,
stats: SessionStatistics {
uaid: uaid.simple().to_string(),
uaid: uaid.to_simple().to_string(),
uaid_reset: reset_uaid,
existing_uaid: check_storage,
connection_type: String::from("webpush"),
Expand All @@ -429,7 +429,7 @@ where
srv.connect_client(RegisteredClient { uaid, uid, tx });

let response = ServerMessage::Hello {
uaid: uaid.simple().to_string(),
uaid: uaid.to_simple().to_string(),
status: 200,
use_webpush: Some(true),
broadcasts,
Expand Down Expand Up @@ -507,7 +507,7 @@ where
let error = if let Some(ref err) = error {
let mut event = event_from_error_chain(err);
event.user = Some(sentry::User {
id: Some(webpush.uaid.simple().to_string()),
id: Some(webpush.uaid.to_simple().to_string()),
..Default::default()
});
event.tags.insert("ua_name".to_string(), ua_result.name.to_string());
Expand Down Expand Up @@ -568,7 +568,6 @@ fn save_and_notify_undelivered_messages(
notifs: Vec<Notification>,
) {
let srv2 = srv.clone();
let srv3 = srv.clone();
let uaid = webpush.uaid.clone();
let connected_at = webpush.connected_at.clone();
srv.handle.spawn(
Expand All @@ -586,7 +585,7 @@ fn save_and_notify_undelivered_messages(
} else if let Some(node_id) = user.node_id {
let result = AsyncClient::builder()
.timeout(Duration::from_secs(1))
.build(&srv3.handle);
.build();
if let Ok(client) = result {
future::ok((client, user.uaid, node_id))
} else {
Expand All @@ -598,7 +597,7 @@ fn save_and_notify_undelivered_messages(
})
.and_then(|(client, uaid, node_id)| {
// Send the notify to the user
let notify_url = format!("{}/notif/{}", node_id, uaid.simple());
let notify_url = format!("{}/notif/{}", node_id, uaid.to_simple());
client
.put(&notify_url)
.send()
Expand Down Expand Up @@ -822,7 +821,7 @@ where
"channel_id" => &channel_id_str);
let channel_id =
Uuid::parse_str(&channel_id_str).chain_err(|| "Invalid channelID")?;
if channel_id.hyphenated().to_string() != channel_id_str {
if channel_id.to_hyphenated().to_string() != channel_id_str {
return Err("Bad UUID format, use lower case, dashed format".into());
}

Expand Down
45 changes: 24 additions & 21 deletions src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ fn has_connected_this_month(user: &DynamoDbUser) -> bool {
})
}

pub fn list_tables(
/// A blocking list_tables call only called during initialization
/// (prior to an any active tokio executor)
pub fn list_tables_sync(
ddb: Rc<Box<DynamoDb>>,
start_key: Option<String>,
) -> impl Future<Item = ListTablesOutput, Error = Error> {
) -> Result<ListTablesOutput> {
let input = ListTablesInput {
exclusive_start_table_name: start_key,
limit: Some(100),
};
ddb.list_tables(&input)
ddb.list_tables(input)
.sync()
.chain_err(|| "Unable to list tables")
}

Expand All @@ -56,7 +59,7 @@ pub fn fetch_messages(
limit: u32,
) -> impl Future<Item = FetchMessageResponse, Error = Error> {
let attr_values = hashmap! {
":uaid".to_string() => val!(S => uaid.simple().to_string()),
":uaid".to_string() => val!(S => uaid.to_simple().to_string()),
":cmi".to_string() => val!(S => "02"),
};
let input = QueryInput {
Expand All @@ -70,7 +73,7 @@ pub fn fetch_messages(

let metrics = Rc::clone(metrics);
let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
retry_if(move || ddb.query(input.clone()), cond)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let mut notifs: Vec<DynamoDbNotification> =
Expand Down Expand Up @@ -125,7 +128,7 @@ pub fn fetch_timestamp_messages(
"01;".to_string()
};
let attr_values = hashmap! {
":uaid".to_string() => val!(S => uaid.simple().to_string()),
":uaid".to_string() => val!(S => uaid.to_simple().to_string()),
":cmi".to_string() => val!(S => range_key),
};
let input = QueryInput {
Expand All @@ -139,7 +142,7 @@ pub fn fetch_timestamp_messages(

let metrics = Rc::clone(metrics);
let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
retry_if(move || ddb.query(input.clone()), cond)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let messages = output.items.map_or_else(Vec::new, |items| {
Expand Down Expand Up @@ -175,11 +178,11 @@ pub fn drop_user(
) -> impl Future<Item = DeleteItemOutput, Error = Error> {
let input = DeleteItemInput {
table_name: router_table_name.to_string(),
key: ddb_item! { uaid: s => uaid.simple().to_string() },
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
..Default::default()
};
retry_if(
move || ddb.delete_item(&input),
move || ddb.delete_item(input.clone()),
|err: &DeleteItemError| matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error dropping user")
}
Expand All @@ -192,11 +195,11 @@ pub fn get_uaid(
let input = GetItemInput {
table_name: router_table_name.to_string(),
consistent_read: Some(true),
key: ddb_item! { uaid: s => uaid.simple().to_string() },
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
..Default::default()
};
retry_if(
move || ddb.get_item(&input),
move || ddb.get_item(input.clone()),
|err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error fetching user")
}
Expand All @@ -219,7 +222,7 @@ pub fn register_user(
retry_if(
move || {
debug!("Registering user: {:?}", item);
ddb.put_item(&PutItemInput {
ddb.put_item(PutItemInput {
item: item.clone(),
table_name: router_table.clone(),
expression_attribute_values: Some(attr_values.clone()),
Expand Down Expand Up @@ -251,7 +254,7 @@ pub fn update_user_message_month(
":lastconnect".to_string() => val!(N => generate_last_connect().to_string()),
};
let update_item = UpdateItemInput {
key: ddb_item! { uaid: s => uaid.simple().to_string() },
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
update_expression: Some(
"SET current_month=:curmonth, last_connect=:lastconnect".to_string(),
),
Expand All @@ -261,7 +264,7 @@ pub fn update_user_message_month(
};

retry_if(
move || ddb.update_item(&update_item).and_then(|_| future::ok(())),
move || ddb.update_item(update_item.clone()).and_then(|_| future::ok(())),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error updating user message month")
}
Expand All @@ -275,14 +278,14 @@ pub fn all_channels(
table_name: message_table_name.to_string(),
consistent_read: Some(true),
key: ddb_item! {
uaid: s => uaid.simple().to_string(),
uaid: s => uaid.to_simple().to_string(),
chidmessageid: s => " ".to_string()
},
..Default::default()
};

let cond = |err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.get_item(&input), cond)
retry_if(move || ddb.get_item(input.clone()), cond)
.and_then(|output| {
let channels = output
.item
Expand Down Expand Up @@ -311,7 +314,7 @@ pub fn save_channels(
};
let update_item = UpdateItemInput {
key: ddb_item! {
uaid: s => uaid.simple().to_string(),
uaid: s => uaid.to_simple().to_string(),
chidmessageid: s => " ".to_string()
},
update_expression: Some("ADD chids :chids SET expiry=:expiry".to_string()),
Expand All @@ -321,7 +324,7 @@ pub fn save_channels(
};

retry_if(
move || ddb.update_item(&update_item).and_then(|_| future::ok(())),
move || ddb.update_item(update_item.clone()).and_then(|_| future::ok(())),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error saving channels")
}
Expand All @@ -332,13 +335,13 @@ pub fn unregister_channel_id(
channel_id: &Uuid,
message_table_name: &str,
) -> impl Future<Item = UpdateItemOutput, Error = Error> {
let chid = channel_id.hyphenated().to_string();
let chid = channel_id.to_hyphenated().to_string();
let attr_values = hashmap! {
":channel_id".to_string() => val!(SS => vec![chid]),
};
let update_item = UpdateItemInput {
key: ddb_item! {
uaid: s => uaid.simple().to_string(),
uaid: s => uaid.to_simple().to_string(),
chidmessageid: s => " ".to_string()
},
update_expression: Some("DELETE chids :channel_id".to_string()),
Expand All @@ -348,7 +351,7 @@ pub fn unregister_channel_id(
};

retry_if(
move || ddb.update_item(&update_item),
move || ddb.update_item(update_item.clone()),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error unregistering channel")
}
Expand Down
Loading

0 comments on commit 5f6fe05

Please sign in to comment.