From 86496370b79d4756d0538c61b511b47e9d848ceb Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Wed, 18 Apr 2018 14:54:09 -0700 Subject: [PATCH] feat: refactor client state machine Co-authored-by: Phil Jenvey Switch to using the futures state machine to separate out client state from the implementation. Closes #1181 --- autopush_rs/Cargo.lock | 139 +++ autopush_rs/Cargo.toml | 2 + autopush_rs/src/client.rs | 1529 ++++++++++++++++------------- autopush_rs/src/lib.rs | 2 + autopush_rs/src/protocol.rs | 8 +- autopush_rs/src/server/mod.rs | 10 +- autopush_rs/src/util/megaphone.rs | 4 +- 7 files changed, 1016 insertions(+), 678 deletions(-) diff --git a/autopush_rs/Cargo.lock b/autopush_rs/Cargo.lock index 79257787..41a2f881 100644 --- a/autopush_rs/Cargo.lock +++ b/autopush_rs/Cargo.lock @@ -65,6 +65,7 @@ dependencies = [ "slog-scope 4.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog-stdlog 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "state_machine_future 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -259,6 +260,48 @@ dependencies = [ "generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "darling" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "darling_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "darling_macro 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "darling_core" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ident_case 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "darling_macro" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "darling_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "derive_state_machine_future" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "darling 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "petgraph 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "digest" version = "0.7.2" @@ -312,6 +355,11 @@ name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "fixedbitset" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "foreign-types" version = "0.3.2" @@ -379,6 +427,14 @@ dependencies = [ "typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "heck" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-segmentation 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hex" version = "0.3.1" @@ -453,6 +509,11 @@ dependencies = [ "tokio-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ident_case" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "idna" version = "0.1.4" @@ -712,11 +773,25 @@ dependencies = [ "vcpkg 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ordermap" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "percent-encoding" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "petgraph" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "ordermap 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "phf" version = "0.7.21" @@ -770,6 +845,11 @@ name = "quick-error" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "quote" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quote" version = "0.5.1" @@ -847,6 +927,11 @@ dependencies = [ "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rent_to_own" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "reqwest" version = "0.8.5" @@ -1153,6 +1238,26 @@ name = "smallvec" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "state_machine_future" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "derive_state_machine_future 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "rent_to_own 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "syn" +version = "0.11.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "syn" version = "0.13.1" @@ -1163,6 +1268,14 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "synom" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "take" version = "0.1.0" @@ -1454,6 +1567,16 @@ name = "unicode-normalization" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "unicode-segmentation" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "unicode-xid" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "unicode-xid" version = "0.1.0" @@ -1621,6 +1744,10 @@ dependencies = [ "checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" "checksum crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d636a8b3bcc1b409d7ffd3facef8f21dcb4009626adbd0c5e6c4305c07253c7b" "checksum crypto-mac 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0999b4ff4d3446d4ddb19a63e9e00c1876e75cd7000d20e57a693b4b3f08d958" +"checksum darling 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1630fdbe3554154a50624487c79b0140a424e87dc08061db1a2211359792acab" +"checksum darling_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d12d2eeb837786ace70b6bca9adfeaef4352cc68d6a42e8e3d0c4159bbca7ab2" +"checksum darling_macro 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "01581bdeabb86f69970dbd9e6ee3c61963f9a7321169589e3dffa16033c0928c" +"checksum derive_state_machine_future 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "54e84dd4e2e6b94edda02aaae8fd8d02f68404817c89183e16d217bb380d08e8" "checksum digest 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "00a49051fef47a72c9623101b19bd71924a45cca838826caae3eaa4d00772603" "checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" "checksum encoding_rs 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "98fd0f24d1fb71a4a6b9330c8ca04cbd4e7cc5d846b54ca74ff376bc7c9f798d" @@ -1628,6 +1755,7 @@ dependencies = [ "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" "checksum error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +"checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" @@ -1637,6 +1765,7 @@ dependencies = [ "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum futures-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a5cedfe9b6dc756220782cc1ba5bcb1fa091cdcba155e40d3556159c3db58043" "checksum generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef25c5683767570c2bbd7deba372926a55eaae9982d7726ee2a1050239d45b9d" +"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" "checksum hex 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "459d3cf58137bb02ad4adeef5036377ff59f066dbb82517b7192e3a5462a2abc" "checksum hmac 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44f3bdb08579d99d7dc761c0e266f13b5f2ab8c8c703b9fc9ef333cd8f48f55e" "checksum hostname 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "58fab6e177434b0bb4cd344a4dabaa5bd6d7a8d792b1885aebcae7af1091d1cb" @@ -1644,6 +1773,7 @@ dependencies = [ "checksum humantime 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0484fda3e7007f2a4a0d9c3a703ca38c71c54c55602ce4660c419fd32e188c9e" "checksum hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)" = "549dbb86397490ce69d908425b9beebc85bbaad25157d67479d4995bb56fdf9a" "checksum hyper-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a5aa51f6ae9842239b0fac14af5f22123b8432b4cc774a44ff059fcba0f675ca" +"checksum ident_case 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3c9826188e666f2ed92071d2dadef6edc430b11b158b5b2b3f4babbcc891eaaa" "checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d" "checksum input_buffer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "64fc52dd2f15e7ce28663e4eada58f457aa8c220044d531c3b8d56a8781af9b1" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" @@ -1675,7 +1805,9 @@ dependencies = [ "checksum openssl 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)" = "63246f69962e8d5ef865f82a65241d6483c8a2905a1801e2f7feb5d187d51320" "checksum openssl 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "a3605c298474a3aa69de92d21139fb5e2a81688d308262359d85cdd0d12a7985" "checksum openssl-sys 0.9.28 (registry+https://github.com/rust-lang/crates.io-index)" = "0bbd90640b148b46305c1691eed6039b5c8509bed16991e3562a01eeb76902a3" +"checksum ordermap 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a86ed3f5f244b372d6b1a00b72ef7f8876d0bc6a78a4c9985c53614041512063" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" +"checksum petgraph 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "8b30dc85588cd02b9b76f5e386535db546d21dc68506cff2abebee0b6445e8e4" "checksum phf 0.7.21 (registry+https://github.com/rust-lang/crates.io-index)" = "cb325642290f28ee14d8c6201159949a872f220c62af6e110a56ea914fbe42fc" "checksum phf_codegen 0.7.21 (registry+https://github.com/rust-lang/crates.io-index)" = "d62594c0bb54c464f633175d502038177e90309daf2e0158be42ed5f023ce88f" "checksum phf_generator 0.7.21 (registry+https://github.com/rust-lang/crates.io-index)" = "6b07ffcc532ccc85e3afc45865469bf5d9e4ef5bfcf9622e3cfe80c2d275ec03" @@ -1683,6 +1815,7 @@ dependencies = [ "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" "checksum proc-macro2 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "49b6a521dc81b643e9a51e0d1cf05df46d5a2f3c0280ea72bcb68276ba64a118" "checksum quick-error 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eda5fe9b71976e62bc81b781206aaa076401769b2143379d3eb2118388babac4" +"checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum quote 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7b0ff51282f28dc1b53fd154298feaa2e77c5ea0dba68e1fd8b03b72fbe13d2a" "checksum rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "15a732abf9d20f0ad8eeb6f909bf6868722d9a06e1e50802b6a70351f40b4eb1" "checksum rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eba5f8cb59cc50ed56be8880a5c7b496bfd9bd26394e176bc67884094145c2c5" @@ -1692,6 +1825,7 @@ dependencies = [ "checksum regex-syntax 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "bd90079345f4a4c3409214734ae220fd773c6f2e8a543d07370c6c1c369cfbfb" "checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a" "checksum remove_dir_all 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dfc5b3ce5d5ea144bb04ebd093a9e14e9765bcfec866aecda9b6dec43b3d1e24" +"checksum rent_to_own 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05a51ad2b1c5c710fa89e6b1631068dab84ed687bc6a5fe061ad65da3d0c25b2" "checksum reqwest 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)" = "241faa9a8ca28a03cbbb9815a5d085f271d4c0168a19181f106aa93240c22ddb" "checksum rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12daaa6d62d64f6447bf0299ce775f4e05f8e75e5418e817da094b9de04ad22d" "checksum rusoto_credential 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "53199d09fd1b7d4f5ac50f4d23106577624238ea77cae2b44eb1d1fc4cd956a4" @@ -1724,7 +1858,10 @@ dependencies = [ "checksum slog-stdlog 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ac42f8254ae996cc7d640f9410d3b048dcdf8887a10df4d5d4c44966de24c4a8" "checksum slog-term 2.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5951a808c40f419922ee014c15b6ae1cd34d963538b57d8a4778b9ca3fff1e0b" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" +"checksum state_machine_future 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eaafbb574dda413e09727f3a534af6837756c9edb69691c120a3240fa30179da" +"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum syn 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)" = "91b52877572087400e83d24b9178488541e3d535259e04ff17a63df1e5ceff59" +"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" @@ -1754,6 +1891,8 @@ dependencies = [ "checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a" "checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" "checksum unicode-normalization 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "51ccda9ef9efa3f7ef5d91e8f9b83bbe6955f9bf86aec89d5cce2c874625920f" +"checksum unicode-segmentation 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a8083c594e02b8ae1654ae26f0ade5158b119bd88ad0e8227a5d8fcd72407946" +"checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f808aadd8cfec6ef90e4a14eb46f24511824d1ac596b9682703c87056c8678b7" diff --git a/autopush_rs/Cargo.toml b/autopush_rs/Cargo.toml index 177ecb82..3c2e32b0 100644 --- a/autopush_rs/Cargo.toml +++ b/autopush_rs/Cargo.toml @@ -40,6 +40,8 @@ slog-term = "2.3.0" slog-json = "2.2.0" slog-scope = "4.0.1" slog-stdlog = "3.0.2" +# state_machine_future = { version = "0.1.6", features = ["debug_code_generation"] } +state_machine_future = "0.1.6" time = "0.1.39" tokio-core = "0.1.16" tokio-io = "0.1.6" diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index baea1ea0..23d6de9a 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -4,8 +4,9 @@ //! this'll house all the various state machine transitions and state management //! of connected clients. Note that it's expected there'll be a lot of connected //! clients, so this may appears relatively heavily optimized! - +use std::cell::RefCell; use std::collections::HashMap; +use std::mem; use std::rc::Rc; use cadence::prelude::*; @@ -13,10 +14,11 @@ use futures::AsyncSink; use futures::future::Either; use futures::sync::mpsc; use futures::sync::oneshot::Receiver; -use futures::{Stream, Sink, Future, Poll, Async}; +use futures::{Async, Future, Poll, Sink, Stream}; use futures_backoff::retry_if; use rusoto_dynamodb::{AttributeValue, DynamoDb}; -use rusoto_dynamodb::{UpdateItemInput, UpdateItemOutput, UpdateItemError}; +use rusoto_dynamodb::{UpdateItemError, UpdateItemInput, UpdateItemOutput}; +use state_machine_future::RentToOwn; use tokio_core::reactor::Timeout; use time; use uuid::Uuid; @@ -24,21 +26,114 @@ use woothee::parser::Parser; use call; use errors::*; -use protocol::{ClientAck, ClientMessage, ServerMessage, ServerNotification, Notification}; +use protocol::{ClientMessage, Notification, ServerMessage, ServerNotification}; use server::Server; use util::parse_user_agent; use util::megaphone::{ClientServices, Service, ServiceClientInit}; const MAX_EXPIRY: u64 = 2592000; +// Created and handed to the AutopushServer pub struct RegisteredClient { pub uaid: Uuid, pub uid: Uuid, pub tx: mpsc::UnboundedSender, } +pub struct Client +where + T: Stream + + Sink + + 'static, +{ + state_machine: UnAuthClientStateFuture, + srv: Rc, + broadcast_services: Rc>, + tx: mpsc::UnboundedSender, +} + +impl Client +where + T: Stream + + Sink + + 'static, +{ + /// Spins up a new client communicating over the websocket `ws` specified. + /// + /// The `ws` specified already has ping/pong parts of the websocket + /// protocol managed elsewhere, and this struct is only expected to deal + /// with webpush-specific messages. + /// + /// The `srv` argument is the server that this client is attached to and + /// the various state behind the server. This provides transitive access to + /// various configuration options of the server as well as the ability to + /// call back into Python. + pub fn new(ws: T, srv: &Rc, mut uarx: Receiver, host: String) -> Client { + let srv = srv.clone(); + let timeout = Timeout::new(srv.opts.open_handshake_timeout.unwrap(), &srv.handle).unwrap(); + let (tx, rx) = mpsc::unbounded(); + + // Pull out the user-agent, which we should have by now + let uastr = match uarx.poll() { + Ok(Async::Ready(ua)) => ua, + Ok(Async::NotReady) => { + error!("Failed to parse the user-agent"); + String::from("") + } + Err(_) => { + error!("Failed to receive a value"); + String::from("") + } + }; + + let broadcast_services = Rc::new(RefCell::new(Default::default())); + let sm = UnAuthClientState::start( + UnAuthClientData { + srv: srv.clone(), + ws, + user_agent: uastr, + host, + broadcast_services: broadcast_services.clone(), + }, + timeout, + tx.clone(), + rx, + ); + + Client { + state_machine: sm, + srv: srv.clone(), + broadcast_services, + tx, + } + } + + pub fn broadcast_delta(&mut self) -> Option> { + let mut broadcast_services = self.broadcast_services.borrow_mut(); + self.srv.broadcast_delta(&mut broadcast_services) + } + + pub fn shutdown(&mut self) { + let _result = self.tx.unbounded_send(ServerNotification::Disconnect); + } +} + +impl Future for Client +where + T: Stream + + Sink + + 'static, +{ + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll<(), Error> { + self.state_machine.poll() + } +} + // Websocket session statistics -#[derive(Clone)] +#[derive(Clone, Default)] struct SessionStatistics { // User data uaid: String, @@ -57,27 +152,12 @@ struct SessionStatistics { registers: i32, } -// Represents a websocket client connection that may or may not be authenticated -pub struct Client { - data: ClientData, - state: ClientState, -} - -pub struct ClientData { - webpush: Option, - srv: Rc, - ws: T, - user_agent: String, - host: String, -} - // Represent the state for a valid WebPush client that is authenticated pub struct WebPushClient { uaid: Uuid, uid: Uuid, rx: mpsc::UnboundedReceiver, flags: ClientFlags, - broadcast_services: ClientServices, message_month: String, unacked_direct_notifs: Vec, unacked_stored_notifs: Vec, @@ -88,12 +168,31 @@ pub struct WebPushClient { stats: SessionStatistics, } +impl Default for WebPushClient { + fn default() -> WebPushClient { + let (_, rx) = mpsc::unbounded(); + WebPushClient { + uaid: Default::default(), + uid: Default::default(), + rx, + flags: Default::default(), + message_month: Default::default(), + unacked_direct_notifs: Default::default(), + unacked_stored_notifs: Default::default(), + unacked_stored_highest: Default::default(), + connected_at: Default::default(), + stats: Default::default(), + } + } +} + impl WebPushClient { fn unacked_messages(&self) -> bool { self.unacked_stored_notifs.len() > 0 || self.unacked_direct_notifs.len() > 0 } } +#[derive(Default)] pub struct ClientFlags { include_topic: bool, increment_storage: bool, @@ -112,660 +211,278 @@ impl ClientFlags { rotate_message_table: false, } } - - pub fn none(&self) -> bool { - // Indicate if none of the flags are true. - match *self { - ClientFlags { - include_topic: false, - increment_storage: false, - check: false, - reset_uaid: false, - rotate_message_table: false, - } => true, - _ => false, - } - } } -pub enum ClientState { - WaitingForHello(Timeout), - WaitingForProcessHello(MyFuture, Vec), - WaitingForRegister(Uuid, MyFuture), - WaitingForUnRegister(Uuid, MyFuture), - WaitingForCheckStorage(MyFuture), - WaitingForDelete(MyFuture), - WaitingForIncrementStorage(MyFuture), - WaitingForDropUser(MyFuture), - WaitingForMigrateUser(MyFuture), - FinishSend(Option, Option>), - SendMessages(Option>), - CheckStorage, - IncrementStorage, - Await, - Done, - ShutdownCleanup(Option), +pub struct UnAuthClientData { + srv: Rc, + ws: T, + user_agent: String, + host: String, + broadcast_services: Rc>, } -impl Client +impl UnAuthClientData where T: Stream + Sink + 'static, { - /// Spins up a new client communicating over the websocket `ws` specified. - /// - /// The `ws` specified already has ping/pong parts of the websocket - /// protocol managed elsewhere, and this struct is only expected to deal - /// with webpush-specific messages. - /// - /// The `srv` argument is the server that this client is attached to and - /// the various state behind the server. This provides transitive access to - /// various configuration options of the server as well as the ability to - /// call back into Python. - pub fn new(ws: T, srv: &Rc, mut uarx: Receiver, host: String) -> Client { - let srv = srv.clone(); - let timeout = Timeout::new(srv.opts.open_handshake_timeout.unwrap(), &srv.handle).unwrap(); - - // Pull out the user-agent, which we should have by now - let uastr = match uarx.poll() { - Ok(Async::Ready(ua)) => ua, - Ok(Async::NotReady) => { - error!("Failed to parse the user-agent"); - String::from("") - } - Err(_) => { - error!("Failed to receive a value"); - String::from("") - } + fn input_with_timeout(&mut self, timeout: &mut Timeout) -> Poll { + let item = match timeout.poll()? { + Async::Ready(_) => return Err("Client timed out".into()), + Async::NotReady => match self.ws.poll()? { + Async::Ready(None) => return Err("Client dropped".into()), + Async::Ready(Some(msg)) => Async::Ready(msg), + Async::NotReady => Async::NotReady, + }, }; + Ok(item) + } +} - Client { - state: ClientState::WaitingForHello(timeout), - data: ClientData { - webpush: None, - srv: srv.clone(), - ws: ws, - user_agent: uastr, - host, +pub struct AuthClientData { + srv: Rc, + ws: T, + webpush: Rc>, + broadcast_services: Rc>, +} + +impl AuthClientData +where + T: Stream + + Sink + + 'static, +{ + fn input_or_notif(&mut self) -> Poll, Error> { + let mut webpush = self.webpush.borrow_mut(); + let item = match webpush.rx.poll() { + Ok(Async::Ready(Some(notif))) => Either::B(notif), + Ok(Async::Ready(None)) => return Err("Sending side dropped".into()), + Ok(Async::NotReady) => match self.ws.poll()? { + Async::Ready(None) => return Err("Client dropped".into()), + Async::Ready(Some(msg)) => Either::A(msg), + Async::NotReady => return Ok(Async::NotReady), }, - } + Err(_) => return Err("Unexpected error".into()), + }; + Ok(Async::Ready(item)) } +} - pub fn shutdown(&mut self) { - self.data.shutdown(); - } +#[derive(StateMachineFuture)] +pub enum UnAuthClientState +where + T: Stream + + Sink + + 'static, +{ + #[state_machine_future(start, transitions(AwaitProcessHello))] + AwaitHello { + data: UnAuthClientData, + timeout: Timeout, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, + }, - pub fn broadcast_delta(&mut self) -> Option> { - if let Some(ref mut webpush) = self.data.webpush { - self.data.srv.broadcast_delta(&mut webpush.broadcast_services) - } else { - None - } - } + #[state_machine_future(transitions(AwaitSessionComplete))] + AwaitProcessHello { + response: MyFuture, + data: UnAuthClientData, + interested_broadcasts: Vec, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, + }, - fn transition(&mut self) -> Poll { - let host = self.data.host.clone(); - let next_state = match self.state { - ClientState::FinishSend(None, None) => { - return Err("Bad state, should not have nothing to do".into()) - } - ClientState::FinishSend(None, ref mut next_state) => { - debug!("State: FinishSend w/next_state"); - try_ready!(self.data.ws.poll_complete()); - *next_state.take().unwrap() - } - ClientState::FinishSend(ref mut msg, ref mut next_state) => { - debug!("State: FinishSend w/msg & next_state"); - let item = msg.take().unwrap(); - if let ServerMessage::Notification(ref notif) = item { - debug!("Sending message: {:?}", notif); - } - let ret = self.data.ws.start_send(item).chain_err(|| "unable to send")?; - match ret { - AsyncSink::Ready => { - ClientState::FinishSend(None, Some(next_state.take().unwrap())) - } - AsyncSink::NotReady(returned) => { - *msg = Some(returned); - return Ok(Async::NotReady); - } - } - } - ClientState::SendMessages(ref mut more_messages) => { - debug!("State: SendMessages"); - if more_messages.is_some() { - let mut messages = more_messages.take().unwrap(); - if messages.len() > 0 { - let message = messages.remove(0); - if message.topic.is_some() { - self.data.srv.metrics.incr("ua.notification.topic")?; - } - let mlen = message.data.as_ref().map_or(0, |d| d.len() as i64); - // XXX: not emitted for direct notifications (nor was it in websocket.py) - self.data.srv.metrics - .count_with_tags("ua.message_data", mlen) - .with_tag("source", "Stored") - .send()?; - ClientState::FinishSend( - Some(ServerMessage::Notification(message)), - Some(Box::new(ClientState::SendMessages(if messages.len() > 0 { - Some(messages) - } else { - None - }))), - ) - } else { - ClientState::SendMessages(if messages.len() > 0 { - Some(messages) - } else { - None - }) - } - } else { - ClientState::Await - } - } - ClientState::CheckStorage => { - debug!("State: CheckStorage"); - let webpush = self.data.webpush.as_ref().unwrap(); - ClientState::WaitingForCheckStorage(self.data.srv.check_storage( - webpush.uaid.simple().to_string(), - webpush.message_month.clone(), - webpush.flags.include_topic, - webpush.unacked_stored_highest, - )) - } - ClientState::IncrementStorage => { - debug!("State: IncrementStorage"); - self.data.process_increment_storage() - } - ClientState::WaitingForHello(ref mut timeout) => { - debug!("State: WaitingForHello"); - let (uaid, services) = match try_ready!(self.data.input_with_timeout(timeout)) { - ClientMessage::Hello { - uaid, - use_webpush: Some(true), - broadcasts, - .. - } => ( - uaid.and_then(|uaid| Uuid::parse_str(uaid.as_str()).ok()), - Service::from_hashmap(broadcasts.unwrap_or(HashMap::new())) - ), - _ => return Err("Invalid message, must be hello".into()), - }; - let connected_at = time::precise_time_ns() / 1000; - ClientState::WaitingForProcessHello( - self.data.srv.hello(&connected_at, uaid.as_ref()), - services, - ) - } - ClientState::WaitingForProcessHello(ref mut response, ref services) => { - debug!("State: WaitingForProcessHello"); - match try_ready!(response.poll()) { - call::HelloResponse { - uaid: Some(uaid), - message_month, - check_storage, - reset_uaid, - rotate_message_table, - connected_at, - } => { - self.data.process_hello( - uaid, - message_month, - reset_uaid, - rotate_message_table, - check_storage, - connected_at, - services, - ) - } - call::HelloResponse { uaid: None, .. } => { - return Err("Already connected elsewhere".into()) - } - } - } - ClientState::WaitingForCheckStorage(ref mut response) => { - debug!("State: WaitingForCheckStorage"); - let (include_topic, mut messages, timestamp) = match try_ready!(response.poll()) { - call::CheckStorageResponse { - include_topic, - messages, - timestamp, - } => (include_topic, messages, timestamp), - }; - debug!("Got checkstorage response"); - let webpush = self.data.webpush.as_mut().unwrap(); - webpush.flags.include_topic = include_topic; - debug!("Setting unacked stored highest to {:?}", timestamp); - webpush.unacked_stored_highest = timestamp; - if messages.len() > 0 { - // Filter out TTL expired messages - let now = time::get_time().sec as u32; - messages.retain(|ref msg| now < msg.ttl + msg.timestamp); - webpush.flags.increment_storage = !include_topic && timestamp.is_some(); - // If there's still messages send them out - if messages.len() > 0 { - webpush.unacked_stored_notifs.extend( - messages.iter().cloned(), - ); - let message = ServerMessage::Notification(messages.remove(0)); - ClientState::FinishSend( - Some(message), - Some(Box::new(ClientState::SendMessages(Some(messages)))), - ) - } else { - // No messages remaining - ClientState::FinishSend( - None, - Some(Box::new(ClientState::Await)) - ) - } - } else { - webpush.flags.check = false; - ClientState::Await - } - } - ClientState::WaitingForIncrementStorage(ref mut response) => { - debug!("State: WaitingForIncrementStorage"); - try_ready!(response.poll()); - self.data.webpush.as_mut().unwrap().flags.increment_storage = false; - ClientState::Await - } - ClientState::WaitingForMigrateUser(ref mut response) => { - debug!("State: WaitingForMigrateUser"); - let message_month = match try_ready!(response.poll()) { - call::MigrateUserResponse { message_month } => message_month, - }; - let webpush = self.data.webpush.as_mut().unwrap(); - webpush.message_month = message_month; - webpush.flags.rotate_message_table = false; - ClientState::Await - } - ClientState::WaitingForRegister(channel_id, ref mut response) => { - debug!("State: WaitingForRegister"); - let msg = match try_ready!(response.poll()) { - call::RegisterResponse::Success { endpoint } => { - self.data.webpush.as_mut().unwrap().stats.registers += 1; - ServerMessage::Register { - channel_id: channel_id, - status: 200, - push_endpoint: endpoint, - } - } - call::RegisterResponse::Error { error_msg, status, .. } => { - debug!("Got unregister fail, error: {}", error_msg); - ServerMessage::Register { - channel_id: channel_id, - status: status, - push_endpoint: "".into(), - } - } - }; - let next_state = if self.data.unacked_messages() { - ClientState::Await - } else { - ClientState::Await - }; - ClientState::FinishSend(Some(msg), Some(Box::new(next_state))) - } - ClientState::WaitingForUnRegister(channel_id, ref mut response) => { - debug!("State: WaitingForUnRegister"); - let msg = match try_ready!(response.poll()) { - call::UnRegisterResponse::Success { success } => { - debug!("Got the unregister response"); - self.data.webpush.as_mut().unwrap().stats.unregisters += 1; - ServerMessage::Unregister { - channel_id: channel_id, - status: if success { 200 } else { 500 }, - } - } - call::UnRegisterResponse::Error { error_msg, status, .. } => { - debug!("Got unregister fail, error: {}", error_msg); - ServerMessage::Unregister { channel_id, status } - } - }; - let next_state = if self.data.unacked_messages() { - ClientState::Await - } else { - ClientState::Await - }; - ClientState::FinishSend(Some(msg), Some(Box::new(next_state))) - } - ClientState::WaitingForDelete(ref mut response) => { - debug!("State: WaitingForDelete"); - try_ready!(response.poll()); - ClientState::Await - } - ClientState::WaitingForDropUser(ref mut response) => { - debug!("State: WaitingForDropUser"); - try_ready!(response.poll()); - ClientState::Done - } - ClientState::Await => { - debug!("State: Await"); - if let Some(next_state) = self.data.determine_acked_state() { - return Ok(next_state.into()); - } - match try_ready!(self.data.input_or_notif()) { - Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => { - let webpush = self.data.webpush.as_mut().unwrap(); - let service_delta = self.data.srv.client_service_add_service( - &mut webpush.broadcast_services, - &Service::from_hashmap(broadcasts), - ); - if let Some(delta) = service_delta { - ClientState::FinishSend( - Some(ServerMessage::Broadcast { - broadcasts: Service::into_hashmap(delta) - }), - Some(Box::new(ClientState::Await)), - ) - } else { - ClientState::Await - } - } - Either::A(ClientMessage::Register { channel_id, key }) => { - self.data.process_register(channel_id, key) - } - Either::A(ClientMessage::Unregister { channel_id, code }) => { - self.data.process_unregister(channel_id, code) - } - Either::A(ClientMessage::Nack { .. }) => { - self.data.srv.metrics.incr("ua.command.nack").ok(); - self.data.webpush.as_mut().unwrap().stats.nacks += 1; - ClientState::Await - } - Either::A(ClientMessage::Ack { updates }) => self.data.process_acks(updates), - Either::B(ServerNotification::Notification(notif)) => { - let webpush = self.data.webpush.as_mut().unwrap(); - if notif.ttl != 0 { - webpush.unacked_direct_notifs.push(notif.clone()); - } - debug!("Got a notification to send, sending!"); - ClientState::FinishSend( - Some(ServerMessage::Notification(notif)), - Some(Box::new(ClientState::Await)), - ) - } - Either::B(ServerNotification::CheckStorage) => { - let webpush = self.data.webpush.as_mut().unwrap(); - webpush.flags.include_topic = true; - webpush.flags.check = true; - ClientState::Await - } - Either::B(ServerNotification::Disconnect) => { - debug!("Got told to disconnect, connecting client has our uaid"); - ClientState::ShutdownCleanup(Some("Repeat UAID disconnect".into())) - } - _ => return Err("Invalid message".into()), - } - } - ClientState::ShutdownCleanup(ref mut err) => { - debug!("State: ShutdownCleanup"); - if let Some(err_obj) = err.take() { - let mut error = err_obj.to_string(); - for err in err_obj.iter().skip(1) { - error.push_str("\n"); - error.push_str(&err.to_string()); - } - debug!("Error for shutdown of {}: {}", host, error); - }; - self.data.shutdown(); - ClientState::Done - } - ClientState::Done => { - // We don't expect this to actually run, as this state will exit - // the transition. Included for exhaustive matching. - debug!("State: Done"); - ClientState::Done - } - }; - Ok(next_state.into()) - } + #[state_machine_future(transitions(UnAuthDone))] + AwaitSessionComplete { + auth_state_machine: AuthClientStateFuture, + srv: Rc, + user_agent: String, + webpush: Rc>, + }, + + #[state_machine_future(ready)] + UnAuthDone(()), + + #[state_machine_future(error)] + UnAuthClientStateError(Error), } -impl ClientData +impl PollUnAuthClientState for UnAuthClientState where T: Stream + Sink + 'static, { - fn input_with_timeout(&mut self, timeout: &mut Timeout) -> Poll { - let item = match timeout.poll()? { - Async::Ready(_) => return Err("Client timed out".into()), - Async::NotReady => { - match self.ws.poll()? { - Async::Ready(None) => return Err("Client dropped".into()), - Async::Ready(Some(msg)) => Async::Ready(msg), - Async::NotReady => Async::NotReady, - } + fn poll_await_hello<'a>( + hello: &'a mut RentToOwn<'a, AwaitHello>, + ) -> Poll, Error> { + debug!("State: AwaitHello"); + let (uaid, services) = { + let AwaitHello { + ref mut data, + ref mut timeout, + .. + } = **hello; + match try_ready!(data.input_with_timeout(timeout)) { + ClientMessage::Hello { + uaid, + use_webpush: Some(true), + broadcasts, + .. + } => ( + uaid.and_then(|uaid| Uuid::parse_str(uaid.as_str()).ok()), + Service::from_hashmap(broadcasts.unwrap_or(HashMap::new())), + ), + _ => return Err("Invalid message, must be hello".into()), } }; - Ok(item) + + let AwaitHello { data, tx, rx, .. } = hello.take(); + let connected_at = time::precise_time_ns() / 1000; + transition!(AwaitProcessHello { + response: data.srv.hello(&connected_at, uaid.as_ref()), + data, + interested_broadcasts: services, + tx, + rx, + }) } - fn input_or_notif(&mut self) -> Poll, Error> { - let webpush = self.webpush.as_mut().unwrap(); - let item = match webpush.rx.poll() { - Ok(Async::Ready(Some(notif))) => Either::B(notif), - Ok(Async::Ready(None)) => return Err("Sending side dropped".into()), - Ok(Async::NotReady) => { - match self.ws.poll()? { - Async::Ready(None) => return Err("Client dropped".into()), - Async::Ready(Some(msg)) => Either::A(msg), - Async::NotReady => return Ok(Async::NotReady), + fn poll_await_process_hello<'a>( + process_hello: &'a mut RentToOwn<'a, AwaitProcessHello>, + ) -> Poll, Error> { + debug!("State: AwaitProcessHello"); + let (uaid, message_month, check_storage, reset_uaid, rotate_message_table, connected_at) = { + match try_ready!(process_hello.response.poll()) { + call::HelloResponse { + uaid: Some(uaid), + message_month, + check_storage, + reset_uaid, + rotate_message_table, + connected_at, + } => ( + uaid, + message_month, + check_storage, + reset_uaid, + rotate_message_table, + connected_at, + ), + call::HelloResponse { uaid: None, .. } => { + return Err("Already connected elsewhere".into()) } } - Err(_) => return Err("Unexpected error".into()), }; - Ok(Async::Ready(item)) - } - fn process_hello( - &mut self, - uaid: Uuid, - message_month: String, - reset_uaid: bool, - rotate_message_table: bool, - check_storage: bool, - connected_at: u64, - services: &Vec, - ) -> ClientState { - let (tx, rx) = mpsc::unbounded(); + let AwaitProcessHello { + data, + interested_broadcasts, + tx, + rx, + .. + } = process_hello.take(); + let UnAuthClientData { + srv, + ws, + user_agent, + host, + broadcast_services, + } = data; + + // Setup the objects and such needed for a WebPushClient let mut flags = ClientFlags::new(); flags.check = check_storage; flags.reset_uaid = reset_uaid; flags.rotate_message_table = rotate_message_table; - - let ServiceClientInit(client_services, broadcasts) = self.srv.broadcast_init(services); + let ServiceClientInit(client_services, broadcasts) = + srv.broadcast_init(&interested_broadcasts); + broadcast_services.replace(client_services); let uid = Uuid::new_v4(); - self.webpush = Some(WebPushClient { + let webpush = Rc::new(RefCell::new(WebPushClient { uaid, uid: uid.clone(), - broadcast_services: client_services, flags, rx, message_month, - unacked_direct_notifs: Vec::new(), - unacked_stored_notifs: Vec::new(), - unacked_stored_highest: None, connected_at, stats: SessionStatistics { uaid: uaid.hyphenated().to_string(), uaid_reset: reset_uaid, existing_uaid: check_storage, connection_type: String::from("webpush"), - host: self.host.clone(), - direct_acked: 0, - direct_storage: 0, - stored_retrieved: 0, - stored_acked: 0, - nacks: 0, - registers: 0, - unregisters: 0, + host: host.clone(), + ..Default::default() }, - }); - self.srv.connect_client( - RegisteredClient { uaid: uaid, uid: uid, tx: tx }, - ); + ..Default::default() + })); + srv.connect_client(RegisteredClient { uaid, uid, tx }); + let response = ServerMessage::Hello { uaid: uaid.hyphenated().to_string(), status: 200, use_webpush: Some(true), broadcasts: Service::into_hashmap(broadcasts), }; - ClientState::FinishSend(Some(response), Some(Box::new(ClientState::Await))) - } - - fn process_register(&mut self, channel_id: Uuid, key: Option) -> ClientState { - debug!("Got a register command"; "channel_id" => channel_id.hyphenated().to_string()); - let webpush = self.webpush.as_ref().unwrap(); - let uaid = webpush.uaid.clone(); - let message_month = webpush.message_month.clone(); - let channel_id_str = channel_id.hyphenated().to_string(); - let fut = self.srv.register( - uaid.simple().to_string(), - message_month, - channel_id_str, - key, - ); - ClientState::WaitingForRegister(channel_id, fut) - } - - fn process_unregister(&mut self, channel_id: Uuid, code: Option) -> ClientState { - debug!("Got a unregister command"); - let webpush = self.webpush.as_ref().unwrap(); - let uaid = webpush.uaid.clone(); - let message_month = webpush.message_month.clone(); - let channel_id_str = channel_id.hyphenated().to_string(); - let fut = self.srv.unregister( - uaid.simple().to_string(), - message_month, - channel_id_str, - code.unwrap_or(200), + let auth_state_machine = AuthClientState::start( + vec![response], + false, + AuthClientData { + srv: srv.clone(), + ws, + webpush: webpush.clone(), + broadcast_services: broadcast_services.clone(), + }, ); - ClientState::WaitingForUnRegister(channel_id, fut) - } - - fn process_increment_storage(&mut self) -> ClientState { - // Let the variable copies begin, so that nothing is left dangling in the future - // we then assemble because Lifetimes. - let webpush = self.webpush.as_ref().unwrap(); - let timestamp = webpush.unacked_stored_highest.unwrap().to_string(); - let uaid = webpush.uaid.simple().to_string(); - let month_name = webpush.message_month.clone(); - let srv = self.srv.clone(); - let ddb_call = retry_if(move || { - let expiry = (time::get_time().sec as u64) + MAX_EXPIRY; - let mut attr_values = HashMap::new(); - attr_values.insert(":timestamp".to_string(), AttributeValue { - n: Some(timestamp.clone()), - ..Default::default() - }); - attr_values.insert(":expiry".to_string(), AttributeValue { - n: Some(expiry.to_string()), - ..Default::default() - }); - srv.ddb_client.update_item(&UpdateItemInput { - key: ddb_item! { - uaid: s => uaid.clone(), - chidmessageid: s => " ".to_string() - }, - update_expression: Some("SET current_timestamp=:timestamp, expiry=:expiry".to_string()), - expression_attribute_values: Some(attr_values), - table_name: month_name.clone(), - ..Default::default() - }) - }, |err: &UpdateItemError| { - matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)) + transition!(AwaitSessionComplete { + auth_state_machine, + srv, + user_agent, + webpush, }) - .map_err(|_| "Error incrementing storage".into()); - ClientState::WaitingForIncrementStorage(Box::new(ddb_call)) - } - - fn process_acks(&mut self, updates: Vec) -> ClientState { - self.srv.metrics.incr("ua.command.ack").ok(); - let webpush = self.webpush.as_mut().unwrap(); - let mut fut: Option> = None; - for notif in updates.iter() { - if let Some(pos) = webpush.unacked_direct_notifs.iter().position(|v| { - v.channel_id == notif.channel_id && v.version == notif.version - }) - { - webpush.stats.direct_acked += 1; - webpush.unacked_direct_notifs.remove(pos); - continue; - }; - if let Some(pos) = webpush.unacked_stored_notifs.iter().position(|v| { - v.channel_id == notif.channel_id && v.version == notif.version - }) - { - webpush.stats.stored_acked += 1; - let message_month = webpush.message_month.clone(); - let n = webpush.unacked_stored_notifs.remove(pos); - // Topic/legacy messages have no sortkey_timestamp - if n.sortkey_timestamp.is_none() { - if fut.is_none() { - fut = Some(self.srv.delete_message(message_month, n)) - } else { - let my_fut = self.srv.delete_message(message_month, n); - fut = Some(Box::new(fut.take().unwrap().and_then(move |_| my_fut))); - } - } - continue; - }; - } - if let Some(my_fut) = fut { - ClientState::WaitingForDelete(my_fut) - } else { - ClientState::Await - } } - // Called from Await to determine any needed state changes - fn determine_acked_state(&mut self) -> Option { - let webpush = self.webpush.as_ref().unwrap(); - let all_acked = !self.unacked_messages(); - if all_acked && webpush.flags.check && webpush.flags.increment_storage { - Some(ClientState::IncrementStorage) - } else if all_acked && webpush.flags.check { - Some(ClientState::CheckStorage) - } else if all_acked && webpush.flags.rotate_message_table { - Some(ClientState::WaitingForMigrateUser(self.srv.migrate_user( - webpush.uaid.simple().to_string(), - webpush.message_month.clone(), - ))) - } else if all_acked && webpush.flags.reset_uaid { - Some(ClientState::WaitingForDropUser( - self.srv.drop_user(webpush.uaid.simple().to_string()), - )) - } else if all_acked && webpush.flags.none() { - // Explicit call-out that this condition results in no state change. - None - } else { - None - } - } - - fn unacked_messages(&self) -> bool { - self.webpush.as_ref().unwrap().unacked_messages() - } - - pub fn shutdown(&mut self) { - // If we made it past hello, do more cleanup - let mut webpush = match self.webpush.take() { - Some(webpush) => webpush, - None => return + fn poll_await_session_complete<'a>( + session_complete: &'a mut RentToOwn<'a, AwaitSessionComplete>, + ) -> Poll { + // xxx: handle error cases with maybe a log message? + let _error = { + match session_complete.auth_state_machine.poll() { + Ok(Async::Ready(_)) => None, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => Some(e), + } }; + let AwaitSessionComplete { + srv, + user_agent, + webpush, + .. + } = session_complete.take(); + let mut webpush = webpush.borrow_mut(); + // If there's any notifications in the queue, move them to our unacked direct notifs + webpush.rx.close(); + loop { + match webpush.rx.poll() { + Ok(Async::Ready(Some(msg))) => match msg { + ServerNotification::CheckStorage => continue, + ServerNotification::Notification(notif) => { + webpush.unacked_direct_notifs.push(notif) + } + ServerNotification::Disconnect => continue, + }, + Ok(Async::Ready(None)) => break, + Ok(Async::NotReady) => break, + Err(_) => break, + } + } let now = time::precise_time_ns() / 1000; let elapsed = now - webpush.connected_at; - let parser = Parser::new(); - let (ua_result, metrics_os, metrics_browser) = parse_user_agent(&parser, &self.user_agent); - self.srv.metrics + let (ua_result, metrics_os, metrics_browser) = parse_user_agent(&parser, &user_agent); + srv.metrics .time_with_tags("ua.connection.lifespan", elapsed) .with_tag("ua_os_family", metrics_os) .with_tag("ua_browser_family", metrics_browser) @@ -773,86 +490,558 @@ where .send() .ok(); - // If there's any notifications in the queue, move them to our unacked direct notifs - webpush.rx.close(); - // wait() is ok as the queue is closed at this point. - let rx_iter = webpush.rx.wait(); - for msg in rx_iter { - match msg { - Ok(ServerNotification::CheckStorage) => continue, - Ok(ServerNotification::Notification(notif)) => { - webpush.unacked_direct_notifs.push(notif); - } - Ok(ServerNotification::Disconnect) => continue, - Err(_) => continue, - } - } - // If there's direct unack'd messages, they need to be saved out without blocking // here - self.srv.disconnet_client(&webpush.uaid, &webpush.uid); - let mut stats = webpush.stats; + srv.disconnet_client(&webpush.uaid, &webpush.uid); + let mut stats = webpush.stats.clone(); let unacked_direct_notifs = webpush.unacked_direct_notifs.len(); if unacked_direct_notifs > 0 { stats.direct_storage += unacked_direct_notifs as i32; - self.srv.handle.spawn( - self.srv - .store_messages( - webpush.uaid.simple().to_string(), - webpush.message_month, - webpush.unacked_direct_notifs, - ) - .then(|_| { - debug!("Finished saving unacked direct notifications"); - Ok(()) - }), - ) + let notifs = mem::replace(&mut webpush.unacked_direct_notifs, Vec::new()); + srv.handle.spawn(srv.store_messages( + webpush.uaid.simple().to_string(), + webpush.message_month.clone(), + notifs, + ).then(|_| { + debug!("Finished saving unacked direct notifications"); + Ok(()) + })) } // Log out the final stats message info!("Session"; - "uaid_hash" => &stats.uaid, - "uaid_reset" => stats.uaid_reset, - "existing_uaid" => stats.existing_uaid, - "connection_type" => &stats.connection_type, - "host" => &stats.host, - "ua_name" => ua_result.name, - "ua_os_family" => ua_result.os, - "ua_os_ver" => ua_result.os_version, - "ua_browser_family" => ua_result.vendor, - "ua_browser_ver" => ua_result.version, - "ua_category" => ua_result.category, - "connection_time" => elapsed, - "direct_acked" => stats.direct_acked, - "direct_storage" => stats.direct_storage, - "stored_retrieved" => stats.stored_retrieved, - "stored_acked" => stats.stored_acked, - "nacks" => stats.nacks, - "registers" => stats.registers, - "unregisters" => stats.unregisters, + "uaid_hash" => &stats.uaid, + "uaid_reset" => stats.uaid_reset, + "existing_uaid" => stats.existing_uaid, + "connection_type" => &stats.connection_type, + "host" => &stats.host, + "ua_name" => ua_result.name, + "ua_os_family" => ua_result.os, + "ua_os_ver" => ua_result.os_version, + "ua_browser_family" => ua_result.vendor, + "ua_browser_ver" => ua_result.version, + "ua_category" => ua_result.category, + "connection_time" => elapsed, + "direct_acked" => stats.direct_acked, + "direct_storage" => stats.direct_storage, + "stored_retrieved" => stats.stored_retrieved, + "stored_acked" => stats.stored_acked, + "nacks" => stats.nacks, + "registers" => stats.registers, + "unregisters" => stats.unregisters, ); + transition!(UnAuthDone(())) } } -impl Future for Client +#[derive(StateMachineFuture)] +pub enum AuthClientState where T: Stream + Sink + 'static, { - type Item = (); - type Error = Error; + #[state_machine_future(start, transitions(DetermineAck, SendThenWait))] + SendThenWait { + remaining_data: Vec, + poll_complete: bool, + data: AuthClientData, + }, - fn poll(&mut self) -> Poll<(), Error> { - loop { - if let ClientState::Done = self.state { - return Ok(().into()); + #[state_machine_future(transitions(IncrementStorage, CheckStorage, AwaitDropUser, + AwaitMigrateUser, AwaitInput))] + DetermineAck { data: AuthClientData }, + + #[state_machine_future(transitions(DetermineAck, SendThenWait, AwaitInput, AwaitRegister, + AwaitUnregister, AwaitDelete))] + AwaitInput { data: AuthClientData }, + + #[state_machine_future(transitions(AwaitIncrementStorage))] + IncrementStorage { data: AuthClientData }, + + #[state_machine_future(transitions(DetermineAck))] + AwaitIncrementStorage { + ddb_response: MyFuture, + data: AuthClientData, + }, + + #[state_machine_future(transitions(AwaitCheckStorage))] + CheckStorage { data: AuthClientData }, + + #[state_machine_future(transitions(SendThenWait, DetermineAck))] + AwaitCheckStorage { + response: MyFuture, + data: AuthClientData, + }, + + #[state_machine_future(transitions(DetermineAck))] + AwaitMigrateUser { + response: MyFuture, + data: AuthClientData, + }, + + #[state_machine_future(transitions(AuthDone))] + AwaitDropUser { + response: MyFuture, + data: AuthClientData, + }, + + #[state_machine_future(transitions(SendThenWait))] + AwaitRegister { + channel_id: Uuid, + response: MyFuture, + data: AuthClientData, + }, + + #[state_machine_future(transitions(SendThenWait))] + AwaitUnregister { + channel_id: Uuid, + response: MyFuture, + data: AuthClientData, + }, + + #[state_machine_future(transitions(DetermineAck))] + AwaitDelete { + response: MyFuture, + data: AuthClientData, + }, + + #[state_machine_future(ready)] + AuthDone(()), + + #[state_machine_future(error)] + AuthClientStateError(Error), +} + +impl PollAuthClientState for AuthClientState +where + T: Stream + + Sink + + 'static, +{ + fn poll_send_then_wait<'a>( + send: &'a mut RentToOwn<'a, SendThenWait>, + ) -> Poll, Error> { + let start_send = { + let SendThenWait { + ref mut remaining_data, + poll_complete, + ref mut data, + .. + } = **send; + if poll_complete { + try_ready!(data.ws.poll_complete()); + false + } else if remaining_data.len() > 0 { + let item = remaining_data.remove(0); + let ret = data.ws.start_send(item).chain_err(|| "unable to send")?; + match ret { + AsyncSink::Ready => true, + AsyncSink::NotReady(returned) => { + remaining_data.insert(0, returned); + return Ok(Async::NotReady); + } + } + } else { + false } - match self.transition() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(next_state)) => self.state = next_state, - Err(e) => self.state = ClientState::ShutdownCleanup(Some(e)), + }; + + let SendThenWait { data, remaining_data, .. } = send.take(); + if start_send { + transition!(SendThenWait { + remaining_data, + poll_complete: true, + data, + }); + } else if remaining_data.len() > 0 { + transition!(SendThenWait { + remaining_data, + poll_complete: false, + data, + }); + } + transition!(DetermineAck { data }) + } + + fn poll_determine_ack<'a>( + detack: &'a mut RentToOwn<'a, DetermineAck>, + ) -> Poll, Error> { + let DetermineAck { data } = detack.take(); + let webpush_rc = data.webpush.clone(); + let webpush = webpush_rc.borrow(); + let all_acked = !webpush.unacked_messages(); + if all_acked && webpush.flags.check && webpush.flags.increment_storage { + transition!(IncrementStorage { data }); + } else if all_acked && webpush.flags.check { + transition!(CheckStorage { data }); + } else if all_acked && webpush.flags.rotate_message_table { + let response = data.srv.migrate_user( + webpush.uaid.simple().to_string(), + webpush.message_month.clone(), + ); + transition!(AwaitMigrateUser { response, data }); + } else if all_acked && webpush.flags.reset_uaid { + let response = data.srv.drop_user(webpush.uaid.simple().to_string()); + transition!(AwaitDropUser { response, data }); + } + transition!(AwaitInput { data }) + } + + fn poll_await_input<'a>( + await: &'a mut RentToOwn<'a, AwaitInput>, + ) -> Poll, Error> { + let input = try_ready!(await.data.input_or_notif()); + let AwaitInput { data } = await.take(); + let webpush_rc = data.webpush.clone(); + let mut webpush = webpush_rc.borrow_mut(); + match input { + Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => { + let service_delta = { + let mut broadcast_services = data.broadcast_services.borrow_mut(); + data.srv.client_service_add_service( + &mut broadcast_services, + &Service::from_hashmap(broadcasts), + ) + }; + if let Some(delta) = service_delta { + transition!(SendThenWait { + remaining_data: vec![ServerMessage::Broadcast { + broadcasts: Service::into_hashmap(delta), + }], + poll_complete: false, + data, + }); + } else { + transition!(AwaitInput { data }); + } + } + Either::A(ClientMessage::Register { channel_id, key }) => { + debug!("Got a register command"; + "channel_id" => channel_id.hyphenated().to_string()); + let uaid = webpush.uaid.clone(); + let message_month = webpush.message_month.clone(); + let channel_id_str = channel_id.hyphenated().to_string(); + let fut = data.srv.register( + uaid.simple().to_string(), + message_month, + channel_id_str, + key, + ); + transition!(AwaitRegister { + channel_id, + response: fut, + data, + }); + } + Either::A(ClientMessage::Unregister { channel_id, code }) => { + debug!("Got a unregister command"); + let uaid = webpush.uaid.clone(); + let message_month = webpush.message_month.clone(); + let channel_id_str = channel_id.hyphenated().to_string(); + let fut = data.srv.unregister( + uaid.simple().to_string(), + message_month, + channel_id_str, + code.unwrap_or(200), + ); + transition!(AwaitUnregister { + channel_id, + response: fut, + data, + }); + } + Either::A(ClientMessage::Nack { .. }) => { + data.srv.metrics.incr("ua.command.nack").ok(); + webpush.stats.nacks += 1; + transition!(AwaitInput { data }); + } + Either::A(ClientMessage::Ack { updates }) => { + data.srv.metrics.incr("ua.command.ack").ok(); + let mut fut: Option> = None; + for notif in updates.iter() { + if let Some(pos) = webpush.unacked_direct_notifs.iter().position(|v| { + v.channel_id == notif.channel_id && v.version == notif.version + }) { + webpush.stats.direct_acked += 1; + webpush.unacked_direct_notifs.remove(pos); + continue; + }; + if let Some(pos) = webpush.unacked_stored_notifs.iter().position(|v| { + v.channel_id == notif.channel_id && v.version == notif.version + }) { + webpush.stats.stored_acked += 1; + let message_month = webpush.message_month.clone(); + let n = webpush.unacked_stored_notifs.remove(pos); + // Topic/legacy messages have no sortkey_timestamp + if n.sortkey_timestamp.is_none() { + fut = if let Some(call) = fut { + let my_fut = data.srv.delete_message(message_month, n); + Some(Box::new(call.and_then(move |_| my_fut))) + } else { + Some(data.srv.delete_message(message_month, n)) + } + } + continue; + }; + } + if let Some(my_fut) = fut { + transition!(AwaitDelete { + response: my_fut, + data, + }); + } else { + transition!(DetermineAck { data }); + } + } + Either::B(ServerNotification::Notification(notif)) => { + if notif.ttl != 0 { + webpush.unacked_direct_notifs.push(notif.clone()); + } + debug!("Got a notification to send, sending!"); + transition!(SendThenWait { + remaining_data: vec![ServerMessage::Notification(notif)], + poll_complete: false, + data, + }); + } + Either::B(ServerNotification::CheckStorage) => { + webpush.flags.include_topic = true; + webpush.flags.check = true; + transition!(DetermineAck { data }); + } + Either::B(ServerNotification::Disconnect) => { + debug!("Got told to disconnect, connecting client has our uaid"); + return Err("Repeat UAID disconnect".into()); + } + _ => return Err("Invalid message".into()), + } + } + + fn poll_increment_storage<'a>( + increment_storage: &'a mut RentToOwn<'a, IncrementStorage>, + ) -> Poll, Error> { + debug!("State: IncrementStorage"); + let webpush_rc = increment_storage.data.webpush.clone(); + let webpush = webpush_rc.borrow(); + let timestamp = webpush + .unacked_stored_highest + .ok_or("unacked_stored_highest unset")? + .to_string(); + let uaid = webpush.uaid.simple().to_string(); + let month_name = webpush.message_month.clone(); + let srv = increment_storage.data.srv.clone(); + let ddb_response = retry_if( + move || { + let expiry = (time::get_time().sec as u64) + MAX_EXPIRY; + let mut attr_values = HashMap::new(); + attr_values.insert( + ":timestamp".to_string(), + AttributeValue { + n: Some(timestamp.clone()), + ..Default::default() + }, + ); + attr_values.insert( + ":expiry".to_string(), + AttributeValue { + n: Some(expiry.to_string()), + ..Default::default() + }, + ); + srv.ddb_client.update_item(&UpdateItemInput { + key: ddb_item! { + uaid: s => uaid.clone(), + chidmessageid: s => " ".to_string() + }, + update_expression: Some( + "SET current_timestamp=:timestamp, expiry=:expiry".to_string(), + ), + expression_attribute_values: Some(attr_values), + table_name: month_name.clone(), + ..Default::default() + }) + }, + |err: &UpdateItemError| { + matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)) + }, + ).map_err(|_| "Error incrementing storage".into()); + + transition!(AwaitIncrementStorage { + ddb_response: Box::new(ddb_response), + data: increment_storage.take().data, + }) + } + + fn poll_await_increment_storage<'a>( + await_increment_storage: &'a mut RentToOwn<'a, AwaitIncrementStorage>, + ) -> Poll, Error> { + debug!("State: AwaitIncrementStorage"); + try_ready!(await_increment_storage.ddb_response.poll()); + let AwaitIncrementStorage { data, .. } = await_increment_storage.take(); + let webpush = data.webpush.clone(); + webpush.borrow_mut().flags.increment_storage = false; + transition!(DetermineAck { data }) + } + + fn poll_check_storage<'a>( + check_storage: &'a mut RentToOwn<'a, CheckStorage>, + ) -> Poll, Error> { + debug!("State: CheckStorage"); + let CheckStorage { data } = check_storage.take(); + let response = { + let webpush = data.webpush.borrow(); + data.srv.check_storage( + webpush.uaid.simple().to_string(), + webpush.message_month.clone(), + webpush.flags.include_topic, + webpush.unacked_stored_highest, + ) + }; + transition!(AwaitCheckStorage { response, data }) + } + + fn poll_await_check_storage<'a>( + await_check_storage: &'a mut RentToOwn<'a, AwaitCheckStorage>, + ) -> Poll, Error> { + debug!("State: AwaitCheckStorage"); + let (include_topic, mut messages, timestamp) = + match try_ready!(await_check_storage.response.poll()) { + call::CheckStorageResponse { + include_topic, + messages, + timestamp, + } => (include_topic, messages, timestamp), }; + debug!("Got checkstorage response"); + + let AwaitCheckStorage { data, .. } = await_check_storage.take(); + let webpush_rc = data.webpush.clone(); + let mut webpush = webpush_rc.borrow_mut(); + webpush.flags.include_topic = include_topic; + debug!("Setting unacked stored highest to {:?}", timestamp); + webpush.unacked_stored_highest = timestamp; + if messages.len() > 0 { + // Filter out TTL expired messages + let now = time::get_time().sec as u32; + messages.retain(|ref msg| now < msg.ttl + msg.timestamp); + webpush.flags.increment_storage = !include_topic && timestamp.is_some(); + // If there's still messages send them out + if messages.len() > 0 { + webpush + .unacked_stored_notifs + .extend(messages.iter().cloned()); + transition!(SendThenWait { + remaining_data: messages.into_iter().map(ServerMessage::Notification).collect(), + poll_complete: false, + data, + }) + } else { + // No messages remaining + transition!(DetermineAck { data }) + } + } else { + webpush.flags.check = false; + transition!(DetermineAck { data }) } } + + fn poll_await_migrate_user<'a>( + await_migrate_user: &'a mut RentToOwn<'a, AwaitMigrateUser>, + ) -> Poll, Error> { + debug!("State: AwaitMigrateUser"); + let message_month = match try_ready!(await_migrate_user.response.poll()) { + call::MigrateUserResponse { message_month } => message_month, + }; + let AwaitMigrateUser { data, .. } = await_migrate_user.take(); + { + let mut webpush = data.webpush.borrow_mut(); + webpush.message_month = message_month; + webpush.flags.rotate_message_table = false; + } + transition!(DetermineAck { data }) + } + + fn poll_await_drop_user<'a>( + await_drop_user: &'a mut RentToOwn<'a, AwaitDropUser>, + ) -> Poll { + debug!("State: AwaitDropUser"); + try_ready!(await_drop_user.response.poll()); + transition!(AuthDone(())) + } + + fn poll_await_register<'a>( + await_register: &'a mut RentToOwn<'a, AwaitRegister>, + ) -> Poll, Error> { + debug!("State: AwaitRegister"); + let msg = match try_ready!(await_register.response.poll()) { + call::RegisterResponse::Success { endpoint } => { + let mut webpush = await_register.data.webpush.borrow_mut(); + webpush.stats.registers += 1; + ServerMessage::Register { + channel_id: await_register.channel_id, + status: 200, + push_endpoint: endpoint, + } + } + call::RegisterResponse::Error { + error_msg, status, .. + } => { + debug!("Got unregister fail, error: {}", error_msg); + ServerMessage::Register { + channel_id: await_register.channel_id, + status: status, + push_endpoint: "".into(), + } + } + }; + + transition!(SendThenWait { + remaining_data: vec![msg], + poll_complete: false, + data: await_register.take().data, + }) + } + + fn poll_await_unregister<'a>( + await_unregister: &'a mut RentToOwn<'a, AwaitUnregister>, + ) -> Poll, Error> { + debug!("State: AwaitUnRegister"); + let msg = match try_ready!(await_unregister.response.poll()) { + call::UnRegisterResponse::Success { success } => { + debug!("Got the unregister response"); + let mut webpush = await_unregister.data.webpush.borrow_mut(); + webpush.stats.unregisters += 1; + ServerMessage::Unregister { + channel_id: await_unregister.channel_id, + status: if success { 200 } else { 500 }, + } + } + call::UnRegisterResponse::Error { + error_msg, status, .. + } => { + debug!("Got unregister fail, error: {}", error_msg); + ServerMessage::Unregister { + channel_id: await_unregister.channel_id, + status, + } + } + }; + + transition!(SendThenWait { + remaining_data: vec![msg], + poll_complete: false, + data: await_unregister.take().data, + }) + } + + fn poll_await_delete<'a>( + await_delete: &'a mut RentToOwn<'a, AwaitDelete>, + ) -> Poll, Error> { + debug!("State: AwaitDelete"); + try_ready!(await_delete.response.poll()); + transition!(DetermineAck { + data: await_delete.take().data, + }) + } } diff --git a/autopush_rs/src/lib.rs b/autopush_rs/src/lib.rs index 3947649f..213f2d6c 100644 --- a/autopush_rs/src/lib.rs +++ b/autopush_rs/src/lib.rs @@ -90,6 +90,8 @@ extern crate slog_json; extern crate slog_stdlog; #[macro_use] extern crate serde_json; +#[macro_use] +extern crate state_machine_future; extern crate time; extern crate tokio_core; extern crate tokio_io; diff --git a/autopush_rs/src/protocol.rs b/autopush_rs/src/protocol.rs index f7ff027e..eab365f3 100644 --- a/autopush_rs/src/protocol.rs +++ b/autopush_rs/src/protocol.rs @@ -17,6 +17,12 @@ pub enum ServerNotification { Disconnect, } +impl Default for ServerNotification { + fn default() -> ServerNotification { + ServerNotification::Disconnect + } +} + #[derive(Deserialize)] #[serde(tag = "messageType", rename_all = "snake_case")] pub enum ClientMessage { @@ -94,7 +100,7 @@ pub enum ServerMessage { Notification(Notification), } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Default, Deserialize, Clone, Debug)] pub struct Notification { pub uaid: Option, #[serde(rename = "channelID")] diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 8000c2f5..bd1e1547 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -729,11 +729,11 @@ impl Future for PingManager { break; } } - assert!(!socket.ping); + debug_assert!(!socket.ping); match self.waiting { WaitingFor::SendPing => { - assert!(!socket.pong_timeout); - assert!(!socket.pong_received); + debug_assert!(!socket.pong_timeout); + debug_assert!(!socket.pong_received); match self.timeout.poll()? { Async::Ready(()) => { debug!("scheduling a ping to get sent"); @@ -747,7 +747,7 @@ impl Future for PingManager { // If we received a pong, then switch us back to waiting // to send out a ping debug!("pong received, going back to sending a ping"); - assert!(!socket.pong_timeout); + debug_assert!(!socket.pong_timeout); let at = Instant::now() + self.srv.opts.auto_ping_interval; self.timeout.reset(at); self.waiting = WaitingFor::SendPing; @@ -772,7 +772,7 @@ impl Future for PingManager { } } WaitingFor::Close => { - assert!(!socket.pong_timeout); + debug_assert!(!socket.pong_timeout); if self.timeout.poll()?.is_ready() { if let CloseState::Exchange(ref mut client) = self.client { client.shutdown(); diff --git a/autopush_rs/src/util/megaphone.rs b/autopush_rs/src/util/megaphone.rs index 47098098..e16b098d 100644 --- a/autopush_rs/src/util/megaphone.rs +++ b/autopush_rs/src/util/megaphone.rs @@ -5,11 +5,11 @@ use std::time::Duration; use reqwest; // A Service entry Key in a ServiceRegistry -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash)] struct ServiceKey(u32); // A list of services that a client is interested in and the last change seen -#[derive(Debug)] +#[derive(Debug, Default)] pub struct ClientServices { service_list: Vec, change_count: u32,