From b5a488068e1cab3a260efc5c809d64f1c1dd28bf Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Thu, 14 Mar 2019 23:42:47 +0800 Subject: [PATCH 01/18] chore: Update tentacle version --- Cargo.lock | 231 ++++++++++++++++++++++++++++++++++++++-- network/Cargo.toml | 7 +- network/src/network.rs | 31 +++--- network/src/protocol.rs | 48 +++------ 4 files changed, 262 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19cc16346b..b8b75a5e3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,8 +483,9 @@ dependencies = [ "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-ping 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle-discovery 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle-ping 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.2 (git+https://github.com/paritytech/unsigned-varint)", @@ -970,6 +971,26 @@ name = "data-encoding" version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "data-encoding-macro" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding-macro-internal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "digest" version = "0.8.0" @@ -992,6 +1013,11 @@ name = "encode_unicode" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "env_logger" version = "0.6.1" @@ -1278,6 +1304,16 @@ dependencies = [ "want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "idna" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "indexmap" version = "1.0.2" @@ -1571,6 +1607,11 @@ dependencies = [ "linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "matches" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "memchr" version = "2.1.3" @@ -1647,6 +1688,11 @@ dependencies = [ "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "nibble_vec" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "nix" version = "0.11.0" @@ -1844,6 +1890,19 @@ name = "plain" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "proc-macro-hack" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-hack-impl 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "proc-macro-hack-impl" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "proc-macro2" version = "0.4.27" @@ -1876,6 +1935,11 @@ name = "quick-error" version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "quote" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quote" version = "0.6.11" @@ -1903,6 +1967,15 @@ dependencies = [ "rusqlite 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "radix_trie" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "endian-type 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "nibble_vec 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rand" version = "0.3.23" @@ -2368,6 +2441,17 @@ dependencies = [ "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "socket2" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "spin" version = "0.5.0" @@ -2411,6 +2495,16 @@ name = "subtle" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "syn" +version = "0.11.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "syn" version = "0.15.26" @@ -2421,6 +2515,14 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "synom" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "synstructure" version = "0.10.1" @@ -2452,7 +2554,7 @@ dependencies = [ [[package]] name = "tentacle" -version = "0.1.0" +version = "0.2.0-alpha.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2467,16 +2569,35 @@ dependencies = [ ] [[package]] -name = "tentacle-ping" +name = "tentacle-discovery" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "generic-channel 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tentacle-ping" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "generic-channel 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2821,6 +2942,49 @@ dependencies = [ "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "trust-dns" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding-macro 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "radix_trie 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns-proto 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "trust-dns-proto" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)", + "socket2 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "try-lock" version = "0.1.0" @@ -2859,11 +3023,32 @@ dependencies = [ "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "unicode-bidi" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "unicode-normalization" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "smallvec 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "unicode-width" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "unicode-xid" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "unicode-xid" version = "0.1.0" @@ -2896,6 +3081,16 @@ name = "untrusted" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "url" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "utf8-ranges" version = "1.0.2" @@ -3090,9 +3285,12 @@ dependencies = [ "checksum ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "044f882973b245404e90c90e7e42e8ee8d7a64edfd7adf83d684fb97e8e2c1b6" "checksum ctrlc 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "630391922b1b893692c6334369ff528dcc3a9d8061ccf4c803aa8f83cb13db5e" "checksum data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4f47ca1860a761136924ddd2422ba77b2ea54fe8cc75b9040804a0d9d32ad97" +"checksum data-encoding-macro 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9b87bf377fc964606c3679c1de6822a9a9d8b69aae2651ca4af28cb2d1550b37" +"checksum data-encoding-macro-internal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "22b318a8d6d56c45df45c61fcc7d2dbf98ea014d4987e7c74ef1f86c9b87e503" "checksum digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05f47366984d3ad862010e22c7ce81a7dbcaebbdfb37241a620f8b6596ee135c" "checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0" "checksum encode_unicode 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "90b2c9496c001e8cb61827acdefad780795c42264c137744cae6f7d9e3450abd" +"checksum endian-type 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" "checksum env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b61fa891024a945da30a9581546e8cfaf5602c7b3f4c137a2805cf388f92075a" "checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02" "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" @@ -3123,6 +3321,7 @@ dependencies = [ "checksum humantime 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3ca7e5f2e110db35f93b837c81797f3714500b81d517bf20c431b16d3ca4f114" "checksum hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)" = "34a590ca09d341e94cddf8e5af0bbccde205d5fbc2fa3c09dd67c7f85cea59d7" "checksum hyper 0.12.25 (registry+https://github.com/rust-lang/crates.io-index)" = "7d5b6658b016965ae301fa995306db965c93677880ea70765a84235a96eae896" +"checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum indicatif 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2c60da1c9abea75996b70a931bba6c750730399005b61ccd853cee50ef3d0d0c" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" @@ -3153,6 +3352,7 @@ dependencies = [ "checksum log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c84ec4b527950aa83a329754b01dbe3f58361d1c5efacd1f6d68c494d08a17c6" "checksum lru-cache 0.1.0 (git+https://github.com/nervosnetwork/lru-cache)" = "" "checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" +"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e1dd4eaac298c32ce07eb6ed9242eda7d82955b9170b7d6db59b2e02cc63fcb8" "checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" "checksum merkle-cbt 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d589b5a7ca642540e7ccfbca3bcd0aa18693eb9287e2a6b17c79b1d062d52863" @@ -3161,6 +3361,7 @@ dependencies = [ "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" +"checksum nibble_vec 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c8d77f3db4bce033f4d04db08079b2ef1c3d02b44e86f25d08886fafa7756ffa" "checksum nix 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d37e713a259ff641624b6cb20e3b12b2952313ba36b6823c0f16e6cfd9e5de17" "checksum nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "2f9667ddcc6cc8a43afc9b7917599d7216aa09c463919ea32c59ed6cac8bc945" "checksum nom 4.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b30adc557058ce00c9d0d7cb3c6e0b5bc6f36e2e2eabe74b0ba726d194abd588" @@ -3182,12 +3383,16 @@ dependencies = [ "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum plain 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +"checksum proc-macro-hack 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2c725b36c99df7af7bf9324e9c999b9e37d92c8f8caf106d82e1d7953218d2d8" +"checksum proc-macro-hack-impl 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2b753ad9ed99dd8efeaa7d2fb8453c8f6bc3e54b97966d35f1bc77ca6865254a" "checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915" "checksum proptest 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea66c78d75f2c6e9f304269eaef90899798daecc69f1a625d5a3dd793ff3522" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" +"checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cdd8e04bd9c52e0342b406469d494fcb033be4bdbe5c606016defbb1681411e1" "checksum r2d2 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5d746fc8a0dab19ccea7ff73ad535854e90ddb3b4b8cdce953dd5cd0b2e7bd22" "checksum r2d2_sqlite 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a7cba990b29ae565b1a765ef45f6b84a89a77736b91582e0243c12f613653857" +"checksum radix_trie 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ebcf72e767017c1aa4b63d4dd0b0b836a243b648fd81d41c6bf6e850ef7a95c7" "checksum rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)" = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" "checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" "checksum rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" @@ -3244,18 +3449,22 @@ dependencies = [ "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum smallvec 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "88aea073965ab29f6edb5493faf96ad662fb18aa9eeb186a3b7057951605ed15" "checksum snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "95d697d63d44ad8b78b8d235bf85b34022a78af292c8918527c5f0cffdde7f43" +"checksum socket2 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "c4d11a52082057d87cb5caa31ad812f4504b97ab44732cd8359df2e9ff9f48e7" "checksum spin 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44363f6f51401c34e7be73db0db371c04705d35efbe9f7d6082e03a921a32c55" "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum stream-cipher 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8861bc80f649f5b4c9bd38b696ae9af74499d479dbfb327f0607de6b326a36bc" "checksum string 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b639411d0b9c738748b5397d5ceba08e648f4f1992231aa859af1a017f31f60b" "checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" "checksum subtle 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee" +"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum syn 0.15.26 (registry+https://github.com/rust-lang/crates.io-index)" = "f92e629aa1d9c827b2bb8297046c1ccffc57c99b947a680d3ccff1f136a3bee9" +"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" "checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" -"checksum tentacle 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6c29b4a6d8dbf15e8191c4723be562689ec734edc86fa9a84585c256d53e04be" -"checksum tentacle-ping 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "853dcfad34c9521fbb0c1dfc64d363b6efc774aae044a89411a25e3155130e24" +"checksum tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6ca96f03210576ee49f63cca4683b5c3001d0482c6b8c00d6792553f7d9beec5" +"checksum tentacle-discovery 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d458765c30402c4265902cf95ff56a85c11306efd0d2d27776e3e200e310e39c" +"checksum tentacle-ping 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9da20c040b0cf48d93c0c91cbd4c3c2fb77e0e39cbc99f9ee474af15a4a20050" "checksum tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9562b522b885f057e39aac9feb9048e64d4b9cc7f5c0be5933ae6606eb56ad40" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" @@ -3285,18 +3494,24 @@ dependencies = [ "checksum tokio-yamux 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1815b4502bb86c8ffaae8521ecc26e148edb4fcc2ed24c1feba0807a492b4946" "checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" "checksum toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "87c5890a989fa47ecdc7bcb4c63a77a82c18f306714104b1decfd722db17b39e" +"checksum trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "65096825b064877da37eeeb9a83390bd23433eabfc503a6476dc5b1949034aa7" +"checksum trust-dns-proto 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "09144f0992b0870fa8d2972cc069cbf1e3c0fda64d1f3d45c4d68d0e0b52ad4e" "checksum try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee2aa4715743892880f70885373966c83d73ef1b0838a664ef0c76fffd35e7c2" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum twofish 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712d261e83e727c8e2dbb75dacac67c36e35db36a958ee504f2164fc052434e1" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" "checksum ucd-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535c204ee4d8434478593480b8f86ab45ec9aae0e83c568ca81abf0fd0e88f86" "checksum unicase 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9d3218ea14b4edcaccfa0df0a64a3792a2c32cc706f1b336e48867f9d3147f90" +"checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" +"checksum unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "141339a08b982d942be2ca06ff8b076563cbe223d1befd5450716790d44e2426" "checksum unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "882386231c45df4700b275c7ff55b6f3698780a650026380e72dabe76fa46526" +"checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum unsigned-varint 0.2.2 (git+https://github.com/paritytech/unsigned-varint)" = "" "checksum unsigned-varint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2c64cdf40b4a9645534a943668681bcb219faf51874d4b65d2e0abda1b10a2ab" "checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f" +"checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" "checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737" "checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d" "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" diff --git a/network/Cargo.toml b/network/Cargo.toml index fb20c1d48b..0d4ff2a07b 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -20,9 +20,10 @@ bytes = "0.4.12" tokio = "0.1.17" futures = { version = "0.1.19", features = ["use_std"] } snap = "0.2" -p2p = { version = "0.1", package="tentacle" } -p2p-ping = { version = "0.1", package="tentacle-ping" } -secio = { version = "0.1", package="tentacle-secio" } +p2p = { version = "0.2.0-alpha.1", package="tentacle" } +secio = { version = "0.1.0", package="tentacle-secio" } +p2p-ping = { version = "0.2.0", package="tentacle-ping" } +p2p-discovery = { version = "0.1.0", package="tentacle-discovery" } faketime = "0.2.0" rusqlite = {version = "0.16.0", features = ["bundled"]} lazy_static = "1.3.0" diff --git a/network/src/network.rs b/network/src/network.rs index bdf9d71efe..bd0d21d1f8 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -26,20 +26,19 @@ use futures::Stream; use log::{debug, error, info, warn}; use multiaddr::multihash::Multihash; use p2p::{ - builder::ServiceBuilder, + builder::{MetaBuilder, ServiceBuilder}, multiaddr::{self, Multiaddr}, secio::{PeerId, PublicKey}, - service::{Service, ServiceError, ServiceEvent}, + service::{DialProtocol, ProtocolHandle, Service, ServiceError, ServiceEvent}, traits::ServiceHandle, }; -use p2p_ping::{Event as PingEvent, PingProtocol}; +use p2p_ping::{Event as PingEvent, PingHandler}; use secio; use std::boxed::Box; use std::cmp::max; use std::sync::Arc; use std::time::{Duration, Instant}; use std::usize; -use tokio::codec::LengthDelimitedCodec; const PING_PROTOCOL_ID: ProtocolId = 0; @@ -80,7 +79,7 @@ impl PeerInfo { } } -type P2PService = Service; +type P2PService = Service; pub struct Network { pub(crate) peers_registry: RwLock, @@ -313,7 +312,7 @@ impl Network { } pub fn dial_addr(&self, addr: Multiaddr) { - if let Err(err) = self.p2p_control.write().dial(addr) { + if let Err(err) = self.p2p_control.write().dial(addr, DialProtocol::All) { error!(target: "network", "failed to dial: {}", err); } } @@ -372,14 +371,20 @@ impl Network { let mut p2p_service = ServiceBuilder::default().forever(true); // register protocols let (ping_sender, ping_receiver) = channel(std::u8::MAX as usize); - p2p_service = p2p_service.insert_protocol(PingProtocol::new( - PING_PROTOCOL_ID, - config.ping_interval, - config.ping_timeout, - ping_sender, - )); + let ping_meta = MetaBuilder::default() + .id(PING_PROTOCOL_ID) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(PingHandler::new( + PING_PROTOCOL_ID, + config.ping_interval, + config.ping_timeout, + ping_sender, + ))) + }) + .build(); + p2p_service = p2p_service.insert_protocol(ping_meta); for (ckb_protocol, _) in &ckb_protocols { - p2p_service = p2p_service.insert_protocol(ckb_protocol.clone()); + p2p_service = p2p_service.insert_protocol(ckb_protocol.build()); } let mut p2p_service = p2p_service .key_pair(local_private_key.clone()) diff --git a/network/src/protocol.rs b/network/src/protocol.rs index a68043c5cb..22e847fa6f 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol.rs @@ -1,12 +1,14 @@ use crate::{peers_registry::Session, PeerId, ServiceContext, SessionContext}; +use bytes::Bytes; use futures::sync::mpsc::Sender; use log::{debug, error}; use p2p::{ + builder::MetaBuilder, multiaddr::Multiaddr, - traits::{ProtocolMeta, ServiceProtocol}, + service::{ProtocolHandle, ProtocolMeta}, + traits::ServiceProtocol, ProtocolId, }; -use tokio::codec::LengthDelimitedCodec; pub type Version = u8; @@ -50,34 +52,18 @@ impl CKBProtocol { pub fn match_version(&self, version: Version) -> bool { self.supported_versions.contains(&version) } -} - -impl ProtocolMeta for CKBProtocol { - fn name(&self) -> String { - self.protocol_name() - } - - fn id(&self) -> ProtocolId { - CKBProtocol::id(&self) - } - - fn codec(&self) -> LengthDelimitedCodec { - LengthDelimitedCodec::new() - } - - fn service_handle(&self) -> Option> { - let handler = Box::new(CKBHandler { - id: self.id(), - event_sender: self.event_sender.clone(), - }); - Some(handler) - } - fn support_versions(&self) -> Vec { - self.supported_versions - .iter() - .map(|v| format!("{}", v)) - .collect() + pub fn build(&self) -> ProtocolMeta { + let event_sender = self.event_sender.clone(); + MetaBuilder::default() + .id(self.id) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(CKBHandler { + id: self.id, + event_sender, + })) + }) + .build() } } @@ -146,14 +132,14 @@ impl ServiceProtocol for CKBHandler { } } - fn received(&mut self, _control: &mut ServiceContext, session: &SessionContext, data: Vec) { + fn received(&mut self, _control: &mut ServiceContext, session: &SessionContext, data: Bytes) { if let Some(peer_id) = session .remote_pubkey .as_ref() .map(|pubkey| pubkey.peer_id()) { debug!(target: "network", "ckb protocol received, addr: {}, protocol: {}, peer_id: {:?}", session.address, self.id, &peer_id); - self.send_event(Event::Received(peer_id, self.id, data)); + self.send_event(Event::Received(peer_id, self.id, data.to_vec())); } } fn notify(&mut self, _control: &mut ServiceContext, token: u64) { From 79e4277c5103cce40cc00dcbea05a309c6ccabea Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Mon, 18 Mar 2019 13:14:24 +0800 Subject: [PATCH 02/18] feat: Add discovery service --- Cargo.lock | 36 +-- network/Cargo.toml | 12 +- network/src/network.rs | 62 +++-- network/src/network_service.rs | 3 +- network/src/service.rs | 1 + network/src/service/discovery_service.rs | 291 +++++++++++++++++++++++ network/src/service/identify_service.rs | 236 ------------------ 7 files changed, 362 insertions(+), 279 deletions(-) create mode 100644 network/src/service/discovery_service.rs delete mode 100644 network/src/service/identify_service.rs diff --git a/Cargo.lock b/Cargo.lock index b8b75a5e3a..1d3cb48251 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,10 +483,10 @@ dependencies = [ "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-discovery 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-ping 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.2 (git+https://github.com/paritytech/unsigned-varint)", ] @@ -2555,23 +2555,23 @@ dependencies = [ [[package]] name = "tentacle" version = "0.2.0-alpha.1" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-multiaddr 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-yamux 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", ] [[package]] name = "tentacle-discovery" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2582,7 +2582,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2590,20 +2590,20 @@ dependencies = [ [[package]] name = "tentacle-ping" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "generic-channel 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", ] [[package]] name = "tentacle-secio" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2916,7 +2916,7 @@ dependencies = [ [[package]] name = "tokio-yamux" version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3462,10 +3462,10 @@ dependencies = [ "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" "checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" -"checksum tentacle 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6ca96f03210576ee49f63cca4683b5c3001d0482c6b8c00d6792553f7d9beec5" -"checksum tentacle-discovery 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d458765c30402c4265902cf95ff56a85c11306efd0d2d27776e3e200e310e39c" -"checksum tentacle-ping 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9da20c040b0cf48d93c0c91cbd4c3c2fb77e0e39cbc99f9ee474af15a4a20050" -"checksum tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9562b522b885f057e39aac9feb9048e64d4b9cc7f5c0be5933ae6606eb56ad40" +"checksum tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" +"checksum tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" +"checksum tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" +"checksum tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum termios 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72b620c5ea021d75a735c943269bb07d30c9b77d6ac6b236bc8b5c496ef05625" @@ -3491,7 +3491,7 @@ dependencies = [ "checksum tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "350c9edade9830dc185ae48ba45667a445ab59f6167ef6d0254ec9d2430d9dd3" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" -"checksum tokio-yamux 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1815b4502bb86c8ffaae8521ecc26e148edb4fcc2ed24c1feba0807a492b4946" +"checksum tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" "checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" "checksum toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "87c5890a989fa47ecdc7bcb4c63a77a82c18f306714104b1decfd722db17b39e" "checksum trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "65096825b064877da37eeeb9a83390bd23433eabfc503a6476dc5b1949034aa7" diff --git a/network/Cargo.toml b/network/Cargo.toml index 0d4ff2a07b..6ca30e8341 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -20,10 +20,14 @@ bytes = "0.4.12" tokio = "0.1.17" futures = { version = "0.1.19", features = ["use_std"] } snap = "0.2" -p2p = { version = "0.2.0-alpha.1", package="tentacle" } -secio = { version = "0.1.0", package="tentacle-secio" } -p2p-ping = { version = "0.2.0", package="tentacle-ping" } -p2p-discovery = { version = "0.1.0", package="tentacle-discovery" } +# p2p = { version = "0.2.0-alpha.1", package="tentacle" } +# secio = { version = "0.1.0", package="tentacle-secio" } +# p2p-ping = { version = "0.2.0", package="tentacle-ping" } +# p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-discovery" } +p2p = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle" } +secio = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-secio" } +p2p-ping = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-ping" } +p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-discovery" } faketime = "0.2.0" rusqlite = {version = "0.16.0", features = ["bundled"]} lazy_static = "1.3.0" diff --git a/network/src/network.rs b/network/src/network.rs index bd0d21d1f8..4f191e1d50 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -7,6 +7,7 @@ use crate::protocol::Version as ProtocolVersion; use crate::protocol_handler::{CKBProtocolHandler, DefaultCKBProtocolContext}; use crate::service::{ ckb_service::CKBService, + discovery_service::{DiscoveryEvent, DiscoveryProtocol, DiscoveryService}, outbound_peer_service::OutboundPeerService, ping_service::PingService, timer_service::{TimerRegistry, TimerService}, @@ -19,6 +20,7 @@ use bytes::Bytes; use ckb_util::{Mutex, RwLock}; use fnv::FnvHashMap; use futures::future::{select_all, Future}; +use futures::sync::mpsc; use futures::sync::mpsc::channel; use futures::sync::mpsc::Receiver; use futures::sync::oneshot; @@ -41,6 +43,7 @@ use std::time::{Duration, Instant}; use std::usize; const PING_PROTOCOL_ID: ProtocolId = 0; +const DISCOVERY_PROTOCOL_ID: ProtocolId = 1; pub type CKBProtocols = Vec<(CKBProtocol, Arc)>; type NetworkResult = Result< @@ -337,7 +340,16 @@ impl Network { pub(crate) fn inner_build( config: &NetworkConfig, ckb_protocols: CKBProtocols, - ) -> Result<(Arc, P2PService, TimerRegistry, Receiver), Error> { + ) -> Result< + ( + Arc, + P2PService, + TimerRegistry, + Receiver, + mpsc::UnboundedReceiver, + ), + Error, + > { let local_private_key = match config.fetch_private_key() { Some(private_key) => private_key?, None => return Err(ConfigError::InvalidKey.into()), @@ -383,6 +395,19 @@ impl Network { }) .build(); p2p_service = p2p_service.insert_protocol(ping_meta); + + let (disc_sender, disc_receiver) = mpsc::unbounded(); + let disc_meta = MetaBuilder::default() + .id(DISCOVERY_PROTOCOL_ID) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new( + DISCOVERY_PROTOCOL_ID, + disc_sender.clone(), + ))) + }) + .build(); + p2p_service = p2p_service.insert_protocol(disc_meta); + for (ckb_protocol, _) in &ckb_protocols { p2p_service = p2p_service.insert_protocol(ckb_protocol.build()); } @@ -458,7 +483,13 @@ impl Network { } } - Ok((network, p2p_service, timer_registry, ping_receiver)) + Ok(( + network, + p2p_service, + timer_registry, + ping_receiver, + disc_receiver, + )) } pub(crate) fn build_network_future( @@ -469,18 +500,14 @@ impl Network { timer_registry: TimerRegistry, ckb_event_receiver: Receiver, ping_event_receiver: Receiver, + disc_event_receiver: mpsc::UnboundedReceiver, ) -> Result + Send>, Error> { // initialize ckb_protocols let ping_service = PingService { network: Arc::clone(&network), event_receiver: ping_event_receiver, }; - //let identify_service = Arc::new(IdentifyService { - // client_version, - // protocol_version, - // identify_timeout: config.identify_timeout, - // identify_interval: config.identify_interval, - //}); + let disc_service = DiscoveryService::new(Arc::clone(&network), disc_event_receiver); let ckb_service = CKBService { event_receiver: ckb_event_receiver, @@ -506,17 +533,11 @@ impl Network { .for_each(|_| Ok(())) .map_err(|_err| Error::Shutdown), ), - // Box::new( - // discovery_query_service - // .into_future() - // .map(|_| ()) - // .map_err(|(err, _)| err), - // ) as Box + Send>, - //identify_service.start_protocol( - // Arc::clone(&network), - // swarm_controller.clone(), - // basic_transport.clone(), - //), + Box::new( + disc_service + .for_each(|_| Ok(())) + .map_err(|_err| Error::Shutdown), + ), Box::new(timer_service.timer_futures.for_each(|_| Ok(()))), Box::new( outbound_peer_service @@ -548,7 +569,7 @@ impl Network { ckb_protocols: CKBProtocols, ckb_event_receiver: Receiver, ) -> NetworkResult { - let (network, p2p_service, timer_registry, ping_event_receiver) = + let (network, p2p_service, timer_registry, ping_event_receiver, disc_event_receiver) = Self::inner_build(config, ckb_protocols)?; let (close_tx, close_rx) = oneshot::channel(); let network_future = Self::build_network_future( @@ -559,6 +580,7 @@ impl Network { timer_registry, ckb_event_receiver, ping_event_receiver, + disc_event_receiver, )?; Ok((network, close_tx, network_future)) } diff --git a/network/src/network_service.rs b/network/src/network_service.rs index 3eb367ffc8..56a8535b91 100644 --- a/network/src/network_service.rs +++ b/network/src/network_service.rs @@ -67,7 +67,7 @@ impl NetworkService { ckb_protocols: CKBProtocols, ckb_event_receiver: Receiver, ) -> Result { - let (network, p2p_service, timer_registry, ping_event_receiver) = + let (network, p2p_service, timer_registry, ping_event_receiver, disc_event_receiver) = Network::inner_build(config, ckb_protocols)?; let (close_tx, close_rx) = oneshot::channel(); let (init_tx, init_rx) = oneshot::channel(); @@ -88,6 +88,7 @@ impl NetworkService { timer_registry, ckb_event_receiver, ping_event_receiver, + disc_event_receiver, ) .expect("Network thread init"); init_tx.send(()).expect("Network init signal send"); diff --git a/network/src/service.rs b/network/src/service.rs index 18a76a1dc9..a815acd24e 100644 --- a/network/src/service.rs +++ b/network/src/service.rs @@ -1,4 +1,5 @@ pub mod ckb_service; +pub mod discovery_service; pub mod outbound_peer_service; pub mod ping_service; pub mod timer_service; diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs new file mode 100644 index 0000000000..4b82f474b9 --- /dev/null +++ b/network/src/service/discovery_service.rs @@ -0,0 +1,291 @@ +use crate::peer_store::Behaviour; +use crate::Network; +use fnv::FnvHashMap; +use futures::{sync::mpsc, sync::oneshot, Async, Future, Stream}; +use log::{debug, warn}; +use std::sync::Arc; + +use p2p::{ + context::{ServiceContext, SessionContext}, + multiaddr::Multiaddr, + secio::PeerId, + traits::ServiceProtocol, + yamux::session::SessionType, + ProtocolId, SessionId, +}; +use p2p_discovery::{ + AddressManager, Direction, Discovery, DiscoveryHandle, MisbehaveResult, Misbehavior, Substream, +}; + +pub struct DiscoveryProtocol { + id: ProtocolId, + discovery: Option>, + discovery_handle: DiscoveryHandle, + discovery_senders: FnvHashMap>>, + event_sender: mpsc::UnboundedSender, +} + +impl DiscoveryProtocol { + pub fn new( + id: ProtocolId, + event_sender: mpsc::UnboundedSender, + ) -> DiscoveryProtocol { + let addr_mgr = DiscoveryAddressManager { + event_sender: event_sender.clone(), + }; + let discovery = Discovery::new(addr_mgr); + let discovery_handle = discovery.handle(); + DiscoveryProtocol { + id, + discovery: Some(discovery), + discovery_handle, + discovery_senders: FnvHashMap::default(), + event_sender, + } + } +} + +impl ServiceProtocol for DiscoveryProtocol { + fn init(&mut self, control: &mut ServiceContext) { + debug!("protocol [discovery({})]: init", self.id); + + let discovery_task = self + .discovery + .take() + .map(|discovery| { + debug!("Start discovery future_task"); + discovery + .for_each(|()| Ok(())) + .map_err(|err| { + warn!("discovery stream error: {:?}", err); + }) + .then(|_| { + debug!("End of discovery"); + Ok(()) + }) + }) + .unwrap(); + control.future_task(discovery_task); + } + + fn connected(&mut self, control: &mut ServiceContext, session: &SessionContext, _: &str) { + debug!( + "protocol [discovery] open on session [{}], address: [{}], type: [{:?}]", + session.id, session.address, session.ty + ); + let event = DiscoveryEvent::Connected { + session_id: session.id, + peer_id: session.remote_pubkey.clone().map(|pubkey| pubkey.peer_id()), + }; + // TODO: handle send failed + self.event_sender + .unbounded_send(event) + .expect("Send connected event failed"); + + let direction = if session.ty == SessionType::Server { + Direction::Inbound + } else { + Direction::Outbound + }; + let (sender, receiver) = mpsc::channel(8); + self.discovery_senders.insert(session.id, sender); + let substream = Substream::new( + session.address.clone(), + direction, + self.id, + session.id, + receiver, + control.control().clone(), + control.listens(), + ); + match self.discovery_handle.substream_sender.try_send(substream) { + Ok(_) => { + debug!("Send substream success"); + } + Err(err) => { + // TODO: handle channel is full (wait for poll API?) + warn!("Send substream failed : {:?}", err); + } + } + } + + fn disconnected(&mut self, _control: &mut ServiceContext, session: &SessionContext) { + let event = DiscoveryEvent::Disconnected(session.id); + // TODO: handle send failed + self.event_sender + .unbounded_send(event) + .expect("Send disconnected event failed"); + self.discovery_senders.remove(&session.id); + debug!("protocol [discovery] close on session [{}]", session.id); + } + + fn received( + &mut self, + _control: &mut ServiceContext, + session: &SessionContext, + data: bytes::Bytes, + ) { + debug!("[received message]: length={}", data.len()); + + if let Some(ref mut sender) = self.discovery_senders.get_mut(&session.id) { + // TODO: handle channel is full (wait for poll API?) + if let Err(err) = sender.try_send(data.to_vec()) { + if err.is_full() { + warn!("channel is full"); + } else if err.is_disconnected() { + warn!("channel is disconnected"); + } else { + warn!("other channel error: {:?}", err); + } + } + } + } +} + +pub enum DiscoveryEvent { + Connected { + session_id: SessionId, + peer_id: Option, + }, + Disconnected(SessionId), + AddNewAddr { + session_id: SessionId, + addr: Multiaddr, + }, + AddNewAddrs { + session_id: SessionId, + addrs: Vec, + }, + Misbehave { + session_id: SessionId, + kind: Misbehavior, + result: oneshot::Sender, + }, + GetRandom { + n: usize, + result: oneshot::Sender>, + }, +} + +pub struct DiscoveryService { + event_receiver: mpsc::UnboundedReceiver, + network: Arc, + sessions: FnvHashMap, +} + +impl DiscoveryService { + pub fn new( + network: Arc, + event_receiver: mpsc::UnboundedReceiver, + ) -> DiscoveryService { + DiscoveryService { + event_receiver, + network, + sessions: FnvHashMap::default(), + } + } +} + +impl Stream for DiscoveryService { + type Item = (); + type Error = (); + fn poll(&mut self) -> Result>, Self::Error> { + match try_ready!(self.event_receiver.poll()) { + Some(DiscoveryEvent::Connected { + session_id, + peer_id, + }) => { + if let Some(peer_id) = peer_id { + self.sessions.insert(session_id, peer_id); + } + } + Some(DiscoveryEvent::Disconnected(session_id)) => { + self.sessions.remove(&session_id); + } + Some(DiscoveryEvent::AddNewAddr { session_id, addr }) => { + if let Some(peer_id) = self.sessions.get(&session_id) { + let _ = self + .network + .peer_store() + .write() + .add_discovered_address(peer_id, addr); + } + } + Some(DiscoveryEvent::AddNewAddrs { session_id, addrs }) => { + if let Some(peer_id) = self.sessions.get(&session_id) { + let _ = self + .network + .peer_store() + .write() + .add_discovered_addresses(peer_id, addrs); + } + } + Some(DiscoveryEvent::Misbehave { + session_id: _session_id, + kind: _kind, + result: _result, + }) => { + // FIXME: + } + Some(DiscoveryEvent::GetRandom { n, result }) => { + let addrs = self + .network + .peer_store() + .read() + .peers_to_attempt(n as u32) + .into_iter() + .map(|(_peer_id, addr)| addr) + .collect(); + result + .send(addrs) + .expect("Send failed (should not happened)"); + } + None => { + debug!(target: "network", "discovery service shutdown"); + return Ok(Async::Ready(None)); + } + } + Ok(Async::Ready(Some(()))) + } +} + +pub struct DiscoveryAddressManager { + pub event_sender: mpsc::UnboundedSender, +} + +impl AddressManager for DiscoveryAddressManager { + fn add_new_addr(&mut self, session_id: SessionId, addr: Multiaddr) { + // FIXME: what if send failed + let event = DiscoveryEvent::AddNewAddr { session_id, addr }; + self.event_sender.unbounded_send(event).unwrap(); + } + + fn add_new_addrs(&mut self, session_id: SessionId, addrs: Vec) { + // FIXME: what if send failed + let event = DiscoveryEvent::AddNewAddrs { session_id, addrs }; + self.event_sender.unbounded_send(event).unwrap(); + } + + fn misbehave(&mut self, session_id: SessionId, kind: Misbehavior) -> MisbehaveResult { + let (sender, receiver) = oneshot::channel(); + // FIXME: what if send failed + let event = DiscoveryEvent::Misbehave { + session_id, + kind, + result: sender, + }; + self.event_sender.unbounded_send(event).unwrap(); + // FIXME: what if receive failed + receiver.wait().unwrap() + } + + fn get_random(&mut self, n: usize) -> Vec { + let (sender, receiver) = oneshot::channel(); + // FIXME: what if send failed + self.event_sender + .unbounded_send(DiscoveryEvent::GetRandom { n, result: sender }) + .unwrap(); + // FIXME: what if receive failed + receiver.wait().unwrap() + } +} diff --git a/network/src/service/identify_service.rs b/network/src/service/identify_service.rs deleted file mode 100644 index 079be95127..0000000000 --- a/network/src/service/identify_service.rs +++ /dev/null @@ -1,236 +0,0 @@ -#![allow(clippy::needless_pass_by_value)] - -use super::Network; -use super::PeerId; -use crate::peers_registry::PeerIdentifyInfo; -use crate::protocol::Protocol; -use crate::protocol_service::ProtocolService; -use crate::transport::TransportOutput; -use futures::future::{self, Future}; -use futures::Stream; -use libp2p::core::Multiaddr; -use libp2p::core::SwarmController; -use libp2p::core::{upgrade, MuxedTransport}; -use libp2p::identify::IdentifyProtocolConfig; -use libp2p::identify::{IdentifyInfo, IdentifyOutput}; -use libp2p::{self, Transport}; -use log::{debug, error, trace, warn}; -use std::boxed::Box; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::timer::Interval; - -const MAX_LISTENED_ADDRS: usize = 10; - -pub struct IdentifyService { - pub client_version: String, - pub protocol_version: String, - pub identify_timeout: Duration, - pub identify_interval: Duration, -} - -impl IdentifyService { - fn process_identify_info( - &self, - network: Arc, - peer_id: &PeerId, - info: &IdentifyInfo, - observed_addr: &Multiaddr, - ) -> Result<(), IoError> { - trace!("process identify for peer_id {:?} with {:?}", peer_id, info); - // set identify info to peer - { - let identify_info = PeerIdentifyInfo { - client_version: info.agent_version.clone(), - protocol_version: info.protocol_version.clone(), - supported_protocols: info.protocols.clone(), - count_of_known_listen_addrs: info.listen_addrs.len(), - }; - if network - .set_peer_identify_info(&peer_id, identify_info) - .is_err() - { - error!( - target: "network", - "can't find peer_id {:?} during process identify info", - peer_id - ) - } - } - - // add obserevd listened addr - for original_address in network.original_listened_addresses.read().iter() { - let transport = libp2p::tcp::TcpConfig::new(); - trace!( - target: "network", - "try get address use original_address {:?} and observed_address {:?}", - original_address, - observed_addr - ); - // get an external addrs for our node - if let Some(ext_addr) = transport.nat_traversal(original_address, &observed_addr) { - debug!(target: "network", "get new external address {:?}", ext_addr); - network.discovery_listened_address(ext_addr.to_owned()); - } - } - - // update peer addrs in peerstore - let _ = network - .peer_store() - .write() - .add_discovered_addresses(peer_id, info.listen_addrs.clone()); - Ok(()) - } -} - -impl ProtocolService for IdentifyService -where - T: AsyncRead + AsyncWrite + Send + 'static, -{ - type Output = IdentifyOutput; - fn convert_to_protocol( - peer_id: Arc, - addr: &Multiaddr, - output: Self::Output, - ) -> Protocol { - let peer_id = PeerId::clone(&peer_id); - match output { - IdentifyOutput::RemoteInfo { - info, - observed_addr, - } => Protocol::IdentifyRequest(peer_id, info, observed_addr), - IdentifyOutput::Sender { sender } => { - Protocol::IdentifyResponse(peer_id, sender, addr.to_owned()) - } - } - } - - fn handle( - &self, - network: Arc, - protocol: Protocol, - ) -> Box + Send> { - match protocol { - Protocol::IdentifyRequest(peer_id, info, ovserved_addr) => match self - .process_identify_info(Arc::clone(&network), &peer_id, &info, &ovserved_addr) - { - Ok(_) => Box::new(future::ok(())), - Err(err) => Box::new(future::err(err)), - }, - Protocol::IdentifyResponse(_peer_id, sender, addr) => { - sender.send( - IdentifyInfo { - public_key: network.local_public_key().clone(), - protocol_version: format!("ckb/{}", self.protocol_version).to_owned(), - agent_version: format!("ckb/{}", self.client_version).to_owned(), - listen_addrs: network - .listened_addresses(MAX_LISTENED_ADDRS) - .into_iter() - .map(|(addr, _)| addr) - .collect(), - protocols: vec![], // TODO FIXME: report local protocols - }, - &addr, - ) - } - _ => Box::new(future::ok(())) as Box + Send>, - } - } - - fn start_protocol( - &self, - network: Arc, - swarm_controller: SwarmController< - SwarmTran, - Box + Send>, - >, - transport: Tran, - ) -> Box + Send> - where - SwarmTran: MuxedTransport> + Clone + Send + 'static, - SwarmTran::MultiaddrFuture: Send + 'static, - SwarmTran::Dial: Send, - SwarmTran::Listener: Send, - SwarmTran::ListenerUpgrade: Send, - SwarmTran::Incoming: Send, - SwarmTran::IncomingUpgrade: Send, - Tran: MuxedTransport> + Clone + Send + 'static, - Tran::MultiaddrFuture: Send + 'static, - Tran::Dial: Send, - Tran::Listener: Send, - Tran::ListenerUpgrade: Send, - Tran::Incoming: Send, - Tran::IncomingUpgrade: Send, - TranOut: AsyncRead + AsyncWrite + Send + 'static, - { - let transport = transport.and_then(move |out, endpoint, client_addr| { - let peer_id = out.peer_id; - upgrade::apply(out.socket, IdentifyProtocolConfig, endpoint, client_addr).map( - move |(output, addr)| { - let protocol = match output { - IdentifyOutput::RemoteInfo { - info, - observed_addr, - } => Protocol::IdentifyRequest(peer_id, info, observed_addr), - IdentifyOutput::Sender { .. } => { - panic!("should not reach here because we are dialer") - } - }; - (protocol, addr) - }, - ) - }); - - let periodic_identify_future = Interval::new( - Instant::now() + Duration::from_secs(5), - self.identify_interval, - ) - .map_err(|err| { - debug!(target: "network", "identify periodic error {:?}", err); - IoError::new( - IoErrorKind::Other, - format!("identify periodic error {:?}", err), - ) - }) - .for_each({ - let transport = transport.clone(); - let _identify_timeout = self.identify_timeout; - let network = Arc::clone(&network); - move |_| { - for peer_id in network.peers() { - if let Some(ref identify_info) = network.get_peer_identify_info(&peer_id) { - if identify_info.count_of_known_listen_addrs > 0 { - continue; - } - } - // TODO should we try all addresses? - if let Some(addr) = network.get_peer_addresses(&peer_id).get(0) { - trace!( - target: "network", - "request identify to peer {:?} {:?}", - peer_id, - addr - ); - // dial identify - let _ = swarm_controller.dial(addr.clone(), transport.clone()); - } else { - error!( - target: "network", - "error when prepare identify : can't find addresses for peer {:?}", - peer_id - ); - } - } - Ok(()) - } - }) - .then(|err| { - warn!(target: "network", "Identify service stopped, reason: {:?}", err); - err - }); - Box::new(periodic_identify_future) as Box + Send> - } -} From 2b4a02b05fa15ee4590d83235cd137ea67d5b921 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Tue, 19 Mar 2019 10:37:32 +0800 Subject: [PATCH 03/18] refactor: Use Bytes instead of Vec as data type --- Cargo.lock | 1 + network/src/protocol.rs | 4 ++-- network/src/protocol_handler.rs | 3 ++- network/src/service/discovery_service.rs | 12 +++--------- sync/Cargo.toml | 1 + sync/src/net_time_checker.rs | 3 ++- sync/src/relayer/mod.rs | 3 ++- sync/src/synchronizer/mod.rs | 3 ++- 8 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d3cb48251..8422850c5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -617,6 +617,7 @@ version = "0.7.0-pre" dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "bloom-filters 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "ckb-chain 0.7.0-pre", "ckb-chain-spec 0.7.0-pre", "ckb-core 0.7.0-pre", diff --git a/network/src/protocol.rs b/network/src/protocol.rs index 22e847fa6f..4029f77f9d 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol.rs @@ -71,7 +71,7 @@ impl CKBProtocol { pub enum Event { Connected(PeerId, Multiaddr, Session, ProtocolId, Version), Disconnected(PeerId, ProtocolId), - Received(PeerId, ProtocolId, Vec), + Received(PeerId, ProtocolId, Bytes), Notify(ProtocolId, u64), } @@ -139,7 +139,7 @@ impl ServiceProtocol for CKBHandler { .map(|pubkey| pubkey.peer_id()) { debug!(target: "network", "ckb protocol received, addr: {}, protocol: {}, peer_id: {:?}", session.address, self.id, &peer_id); - self.send_event(Event::Received(peer_id, self.id, data.to_vec())); + self.send_event(Event::Received(peer_id, self.id, data)); } } fn notify(&mut self, _control: &mut ServiceContext, token: u64) { diff --git a/network/src/protocol_handler.rs b/network/src/protocol_handler.rs index 6f62a8d195..61ca5aca1b 100644 --- a/network/src/protocol_handler.rs +++ b/network/src/protocol_handler.rs @@ -1,5 +1,6 @@ use crate::errors::{Error, PeerError, ProtocolError}; use crate::{Network, PeerIndex, ProtocolId, SessionInfo, TimerRegistry, TimerToken}; +use bytes::Bytes; use ckb_util::Mutex; use log::debug; use log::info; @@ -148,7 +149,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { pub trait CKBProtocolHandler: Sync + Send { fn initialize(&self, _nc: Box); - fn received(&self, _nc: Box, _peer: PeerIndex, _data: Vec); + fn received(&self, _nc: Box, _peer: PeerIndex, _data: Bytes); fn connected(&self, _nc: Box, _peer: PeerIndex); fn disconnected(&self, _nc: Box, _peer: PeerIndex); fn timer_triggered(&self, _nc: Box, _timer: TimerToken) {} diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index 4b82f474b9..ddcc97dc46 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -1,4 +1,4 @@ -use crate::peer_store::Behaviour; +// use crate::peer_store::Behaviour; use crate::Network; use fnv::FnvHashMap; use futures::{sync::mpsc, sync::oneshot, Async, Future, Stream}; @@ -202,14 +202,8 @@ impl Stream for DiscoveryService { Some(DiscoveryEvent::Disconnected(session_id)) => { self.sessions.remove(&session_id); } - Some(DiscoveryEvent::AddNewAddr { session_id, addr }) => { - if let Some(peer_id) = self.sessions.get(&session_id) { - let _ = self - .network - .peer_store() - .write() - .add_discovered_address(peer_id, addr); - } + Some(DiscoveryEvent::AddNewAddr { .. }) => { + // NOTE: ignore add new addr message, handle this in identify protocol } Some(DiscoveryEvent::AddNewAddrs { session_id, addrs }) => { if let Some(peer_id) = self.sessions.get(&session_id) { diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 92881ac0bf..2a996dc998 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -26,6 +26,7 @@ ckb-chain-spec = { path = "../spec" } bloom-filters = "0.1.0" ckb-traits = { path = "../traits" } failure = "0.1.5" +bytes = "0.4.12" [dev-dependencies] ckb-notify = { path = "../notify" } diff --git a/sync/src/net_time_checker.rs b/sync/src/net_time_checker.rs index 5b621870fe..2750f7f3cc 100644 --- a/sync/src/net_time_checker.rs +++ b/sync/src/net_time_checker.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex}; use ckb_protocol::TimeMessage; use ckb_util::RwLock; @@ -90,7 +91,7 @@ impl Default for NetTimeProtocol { impl CKBProtocolHandler for NetTimeProtocol { fn initialize(&self, _nc: Box) {} - fn received(&self, nc: Box, peer: PeerIndex, data: Vec) { + fn received(&self, nc: Box, peer: PeerIndex, data: Bytes) { // collect time sample from outbound peer if nc.session_info(peer).map(|s| s.peer.is_outbound()) == Some(true) { let now: u64 = faketime::unix_time_as_millis(); diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 115469220f..2c2f28f8f9 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -17,6 +17,7 @@ use self::get_block_transactions_process::GetBlockTransactionsProcess; use self::transaction_process::TransactionProcess; use crate::relayer::compact_block::ShortTransactionID; use crate::types::Peers; +use bytes::Bytes; use ckb_chain::chain::ChainController; use ckb_core::block::{Block, BlockBuilder}; use ckb_core::transaction::{ProposalShortId, Transaction}; @@ -269,7 +270,7 @@ where let _ = nc.register_timer(TX_PROPOSAL_TOKEN, Duration::from_millis(100)); } - fn received(&self, nc: Box, peer: PeerIndex, data: Vec) { + fn received(&self, nc: Box, peer: PeerIndex, data: Bytes) { // TODO use flatbuffers verifier let msg = get_root::(&data); debug!(target: "relay", "msg {:?}", msg.payload_type()); diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 56d99a2d30..886ac968be 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -21,6 +21,7 @@ use crate::{ MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT, MAX_TIP_AGE, POW_SPACE, }; use bitflags::bitflags; +use bytes::Bytes; use ckb_chain::chain::ChainController; use ckb_chain_spec::consensus::Consensus; use ckb_core::block::Block; @@ -691,7 +692,7 @@ where let _ = nc.register_timer(TIMEOUT_EVICTION_TOKEN, Duration::from_millis(1000)); } - fn received(&self, nc: Box, peer: PeerIndex, data: Vec) { + fn received(&self, nc: Box, peer: PeerIndex, data: Bytes) { // TODO use flatbuffers verifier let msg = get_root::(&data); debug!(target: "sync", "msg {:?}", msg.payload_type()); From c367508b48955b94cc06c08fdb904f936c897972 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Tue, 19 Mar 2019 10:39:56 +0800 Subject: [PATCH 04/18] fix: Add target field for log --- network/src/service/discovery_service.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index ddcc97dc46..869dc2f546 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -47,20 +47,20 @@ impl DiscoveryProtocol { impl ServiceProtocol for DiscoveryProtocol { fn init(&mut self, control: &mut ServiceContext) { - debug!("protocol [discovery({})]: init", self.id); + debug!(target: "network", "protocol [discovery({})]: init", self.id); let discovery_task = self .discovery .take() .map(|discovery| { - debug!("Start discovery future_task"); + debug!(target: "network", "Start discovery future_task"); discovery .for_each(|()| Ok(())) .map_err(|err| { - warn!("discovery stream error: {:?}", err); + warn!(target: "network", "discovery stream error: {:?}", err); }) .then(|_| { - debug!("End of discovery"); + debug!(target: "network", "End of discovery"); Ok(()) }) }) @@ -70,6 +70,7 @@ impl ServiceProtocol for DiscoveryProtocol { fn connected(&mut self, control: &mut ServiceContext, session: &SessionContext, _: &str) { debug!( + target: "network", "protocol [discovery] open on session [{}], address: [{}], type: [{:?}]", session.id, session.address, session.ty ); @@ -100,11 +101,11 @@ impl ServiceProtocol for DiscoveryProtocol { ); match self.discovery_handle.substream_sender.try_send(substream) { Ok(_) => { - debug!("Send substream success"); + debug!(target: "network", "Send substream success"); } Err(err) => { // TODO: handle channel is full (wait for poll API?) - warn!("Send substream failed : {:?}", err); + warn!(target: "network", "Send substream failed : {:?}", err); } } } @@ -116,7 +117,7 @@ impl ServiceProtocol for DiscoveryProtocol { .unbounded_send(event) .expect("Send disconnected event failed"); self.discovery_senders.remove(&session.id); - debug!("protocol [discovery] close on session [{}]", session.id); + debug!(target: "network", "protocol [discovery] close on session [{}]", session.id); } fn received( @@ -125,17 +126,17 @@ impl ServiceProtocol for DiscoveryProtocol { session: &SessionContext, data: bytes::Bytes, ) { - debug!("[received message]: length={}", data.len()); + debug!(target: "network", "[received message]: length={}", data.len()); if let Some(ref mut sender) = self.discovery_senders.get_mut(&session.id) { // TODO: handle channel is full (wait for poll API?) if let Err(err) = sender.try_send(data.to_vec()) { if err.is_full() { - warn!("channel is full"); + warn!(target: "network", "channel is full"); } else if err.is_disconnected() { - warn!("channel is disconnected"); + warn!(target: "network", "channel is disconnected"); } else { - warn!("other channel error: {:?}", err); + warn!(target: "network", "other channel error: {:?}", err); } } } From 1b607ebc51d0416b3d73736b91b01c325704fa06 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Tue, 19 Mar 2019 10:47:50 +0800 Subject: [PATCH 05/18] fix: Handle channel errors --- network/src/service/discovery_service.rs | 46 ++++++++++++++---------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index 869dc2f546..10dc4b05ed 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -78,10 +78,10 @@ impl ServiceProtocol for DiscoveryProtocol { session_id: session.id, peer_id: session.remote_pubkey.clone().map(|pubkey| pubkey.peer_id()), }; - // TODO: handle send failed - self.event_sender - .unbounded_send(event) - .expect("Send connected event failed"); + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + return; + } let direction = if session.ty == SessionType::Server { Direction::Inbound @@ -112,10 +112,10 @@ impl ServiceProtocol for DiscoveryProtocol { fn disconnected(&mut self, _control: &mut ServiceContext, session: &SessionContext) { let event = DiscoveryEvent::Disconnected(session.id); - // TODO: handle send failed - self.event_sender - .unbounded_send(event) - .expect("Send disconnected event failed"); + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + return; + } self.discovery_senders.remove(&session.id); debug!(target: "network", "protocol [discovery] close on session [{}]", session.id); } @@ -252,13 +252,17 @@ impl AddressManager for DiscoveryAddressManager { fn add_new_addr(&mut self, session_id: SessionId, addr: Multiaddr) { // FIXME: what if send failed let event = DiscoveryEvent::AddNewAddr { session_id, addr }; - self.event_sender.unbounded_send(event).unwrap(); + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + } } fn add_new_addrs(&mut self, session_id: SessionId, addrs: Vec) { // FIXME: what if send failed let event = DiscoveryEvent::AddNewAddrs { session_id, addrs }; - self.event_sender.unbounded_send(event).unwrap(); + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + } } fn misbehave(&mut self, session_id: SessionId, kind: Misbehavior) -> MisbehaveResult { @@ -269,18 +273,24 @@ impl AddressManager for DiscoveryAddressManager { kind, result: sender, }; - self.event_sender.unbounded_send(event).unwrap(); - // FIXME: what if receive failed - receiver.wait().unwrap() + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + MisbehaveResult::Disconnect + } else { + // FIXME: what if receive failed + receiver.wait().unwrap_or(MisbehaveResult::Disconnect) + } } fn get_random(&mut self, n: usize) -> Vec { let (sender, receiver) = oneshot::channel(); // FIXME: what if send failed - self.event_sender - .unbounded_send(DiscoveryEvent::GetRandom { n, result: sender }) - .unwrap(); - // FIXME: what if receive failed - receiver.wait().unwrap() + let event = DiscoveryEvent::GetRandom { n, result: sender }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + Vec::new() + } else { + receiver.wait().ok().unwrap_or_else(Vec::new) + } } } From d662898c89b62fa28a5e6e0bb66113b778de8668 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Tue, 19 Mar 2019 10:51:11 +0800 Subject: [PATCH 06/18] fix: Use change p2p dependency from branch to reversion --- Cargo.lock | 36 ++++++++++++++++++------------------ network/Cargo.toml | 8 ++++---- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8422850c5c..fa3e9efaa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,10 +483,10 @@ dependencies = [ "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", - "tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", - "tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", - "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.2 (git+https://github.com/paritytech/unsigned-varint)", ] @@ -2556,23 +2556,23 @@ dependencies = [ [[package]] name = "tentacle" version = "0.2.0-alpha.1" -source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-multiaddr 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", ] [[package]] name = "tentacle-discovery" version = "0.1.0" -source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2583,7 +2583,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2591,20 +2591,20 @@ dependencies = [ [[package]] name = "tentacle-ping" version = "0.2.0" -source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "generic-channel 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", ] [[package]] name = "tentacle-secio" version = "0.1.0" -source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2917,7 +2917,7 @@ dependencies = [ [[package]] name = "tokio-yamux" version = "0.1.4" -source = "git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format#9c42be05724c3e2decb73716d0f515c36a7af1cb" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3463,10 +3463,10 @@ dependencies = [ "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" "checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" -"checksum tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" -"checksum tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" -"checksum tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" -"checksum tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" +"checksum tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum termios 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72b620c5ea021d75a735c943269bb07d30c9b77d6ac6b236bc8b5c496ef05625" @@ -3492,7 +3492,7 @@ dependencies = [ "checksum tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "350c9edade9830dc185ae48ba45667a445ab59f6167ef6d0254ec9d2430d9dd3" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" -"checksum tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?branch=discovery-use-multiaddr-format)" = "" +"checksum tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" "checksum toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "87c5890a989fa47ecdc7bcb4c63a77a82c18f306714104b1decfd722db17b39e" "checksum trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "65096825b064877da37eeeb9a83390bd23433eabfc503a6476dc5b1949034aa7" diff --git a/network/Cargo.toml b/network/Cargo.toml index 6ca30e8341..a889e38c34 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -24,10 +24,10 @@ snap = "0.2" # secio = { version = "0.1.0", package="tentacle-secio" } # p2p-ping = { version = "0.2.0", package="tentacle-ping" } # p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-discovery" } -p2p = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle" } -secio = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-secio" } -p2p-ping = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-ping" } -p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-discovery" } +p2p = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle" } +secio = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-secio" } +p2p-ping = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-ping" } +p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-discovery" } faketime = "0.2.0" rusqlite = {version = "0.16.0", features = ["bundled"]} lazy_static = "1.3.0" From ffaf8257ebdb1bb42c2ea71ce0bc7737f50fb36f Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Tue, 19 Mar 2019 11:31:25 +0800 Subject: [PATCH 07/18] chore: Remove outdated FIXME mark --- network/src/service/discovery_service.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index 10dc4b05ed..7a872a9321 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -250,7 +250,6 @@ pub struct DiscoveryAddressManager { impl AddressManager for DiscoveryAddressManager { fn add_new_addr(&mut self, session_id: SessionId, addr: Multiaddr) { - // FIXME: what if send failed let event = DiscoveryEvent::AddNewAddr { session_id, addr }; if self.event_sender.unbounded_send(event).is_err() { warn!(target: "network", "receiver maybe dropped!"); @@ -258,7 +257,6 @@ impl AddressManager for DiscoveryAddressManager { } fn add_new_addrs(&mut self, session_id: SessionId, addrs: Vec) { - // FIXME: what if send failed let event = DiscoveryEvent::AddNewAddrs { session_id, addrs }; if self.event_sender.unbounded_send(event).is_err() { warn!(target: "network", "receiver maybe dropped!"); @@ -267,7 +265,6 @@ impl AddressManager for DiscoveryAddressManager { fn misbehave(&mut self, session_id: SessionId, kind: Misbehavior) -> MisbehaveResult { let (sender, receiver) = oneshot::channel(); - // FIXME: what if send failed let event = DiscoveryEvent::Misbehave { session_id, kind, @@ -277,14 +274,12 @@ impl AddressManager for DiscoveryAddressManager { warn!(target: "network", "receiver maybe dropped!"); MisbehaveResult::Disconnect } else { - // FIXME: what if receive failed receiver.wait().unwrap_or(MisbehaveResult::Disconnect) } } fn get_random(&mut self, n: usize) -> Vec { let (sender, receiver) = oneshot::channel(); - // FIXME: what if send failed let event = DiscoveryEvent::GetRandom { n, result: sender }; if self.event_sender.unbounded_send(event).is_err() { warn!(target: "network", "receiver maybe dropped!"); From 4a27aaf9482587f1921e8a74fda940f679764478 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Tue, 19 Mar 2019 15:25:02 +0800 Subject: [PATCH 08/18] feat: Add identify protocol --- Cargo.lock | 14 +++ network/Cargo.toml | 1 + network/src/network.rs | 68 ++++++++----- network/src/network_service.rs | 5 +- network/src/service.rs | 1 + network/src/service/identify_service.rs | 121 ++++++++++++++++++++++++ 6 files changed, 184 insertions(+), 26 deletions(-) create mode 100644 network/src/service/identify_service.rs diff --git a/Cargo.lock b/Cargo.lock index fa3e9efaa4..d6878be97a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,6 +485,7 @@ dependencies = [ "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-identify 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2588,6 +2589,18 @@ dependencies = [ "trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tentacle-identify" +version = "0.1.0" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tentacle-ping" version = "0.2.0" @@ -3465,6 +3478,7 @@ dependencies = [ "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" "checksum tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-identify 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" diff --git a/network/Cargo.toml b/network/Cargo.toml index a889e38c34..d44f936757 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -28,6 +28,7 @@ p2p = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73 secio = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-secio" } p2p-ping = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-ping" } p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-discovery" } +p2p-identify = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-identify" } faketime = "0.2.0" rusqlite = {version = "0.16.0", features = ["bundled"]} lazy_static = "1.3.0" diff --git a/network/src/network.rs b/network/src/network.rs index 4f191e1d50..5a503017e4 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -8,6 +8,7 @@ use crate::protocol_handler::{CKBProtocolHandler, DefaultCKBProtocolContext}; use crate::service::{ ckb_service::CKBService, discovery_service::{DiscoveryEvent, DiscoveryProtocol, DiscoveryService}, + identify_service::{IdentifyAddressManager, IdentifyEvent, IdentifyProtocol, IdentifyService}, outbound_peer_service::OutboundPeerService, ping_service::PingService, timer_service::{TimerRegistry, TimerService}, @@ -44,6 +45,7 @@ use std::usize; const PING_PROTOCOL_ID: ProtocolId = 0; const DISCOVERY_PROTOCOL_ID: ProtocolId = 1; +const IDENTIFY_PROTOCOL_ID: ProtocolId = 2; pub type CKBProtocols = Vec<(CKBProtocol, Arc)>; type NetworkResult = Result< @@ -82,6 +84,12 @@ impl PeerInfo { } } +pub struct EventReceivers { + pub ping_event_receiver: Receiver, + pub disc_event_receiver: mpsc::UnboundedReceiver, + pub identify_event_receiver: mpsc::UnboundedReceiver, +} + type P2PService = Service; pub struct Network { @@ -340,16 +348,7 @@ impl Network { pub(crate) fn inner_build( config: &NetworkConfig, ckb_protocols: CKBProtocols, - ) -> Result< - ( - Arc, - P2PService, - TimerRegistry, - Receiver, - mpsc::UnboundedReceiver, - ), - Error, - > { + ) -> Result<(Arc, P2PService, TimerRegistry, EventReceivers), Error> { let local_private_key = match config.fetch_private_key() { Some(private_key) => private_key?, None => return Err(ConfigError::InvalidKey.into()), @@ -380,7 +379,6 @@ impl Network { config.reserved_only, reserved_peers, ); - let mut p2p_service = ServiceBuilder::default().forever(true); // register protocols let (ping_sender, ping_receiver) = channel(std::u8::MAX as usize); let ping_meta = MetaBuilder::default() @@ -394,7 +392,6 @@ impl Network { ))) }) .build(); - p2p_service = p2p_service.insert_protocol(ping_meta); let (disc_sender, disc_receiver) = mpsc::unbounded(); let disc_meta = MetaBuilder::default() @@ -406,8 +403,24 @@ impl Network { ))) }) .build(); - p2p_service = p2p_service.insert_protocol(disc_meta); + let (identify_sender, identify_receiver) = mpsc::unbounded(); + let identify_addr_mgr = IdentifyAddressManager::new(identify_sender.clone()); + let identify_meta = MetaBuilder::default() + .id(IDENTIFY_PROTOCOL_ID) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(IdentifyProtocol::new( + IDENTIFY_PROTOCOL_ID, + identify_addr_mgr.clone(), + ))) + }) + .build(); + + let mut p2p_service = ServiceBuilder::default() + .forever(true) + .insert_protocol(ping_meta) + .insert_protocol(disc_meta) + .insert_protocol(identify_meta); for (ckb_protocol, _) in &ckb_protocols { p2p_service = p2p_service.insert_protocol(ckb_protocol.build()); } @@ -487,8 +500,11 @@ impl Network { network, p2p_service, timer_registry, - ping_receiver, - disc_receiver, + EventReceivers { + ping_event_receiver: ping_receiver, + disc_event_receiver: disc_receiver, + identify_event_receiver: identify_receiver, + }, )) } @@ -499,19 +515,21 @@ impl Network { p2p_service: P2PService, timer_registry: TimerRegistry, ckb_event_receiver: Receiver, - ping_event_receiver: Receiver, - disc_event_receiver: mpsc::UnboundedReceiver, + receivers: EventReceivers, ) -> Result + Send>, Error> { // initialize ckb_protocols let ping_service = PingService { network: Arc::clone(&network), - event_receiver: ping_event_receiver, + event_receiver: receivers.ping_event_receiver, }; - let disc_service = DiscoveryService::new(Arc::clone(&network), disc_event_receiver); + let disc_service = + DiscoveryService::new(Arc::clone(&network), receivers.disc_event_receiver); + let identify_service = + IdentifyService::new(Arc::clone(&network), receivers.identify_event_receiver); let ckb_service = CKBService { - event_receiver: ckb_event_receiver, network: Arc::clone(&network), + event_receiver: ckb_event_receiver, }; let timer_service = TimerService::new(timer_registry, Arc::clone(&network)); let outbound_peer_service = @@ -538,6 +556,11 @@ impl Network { .for_each(|_| Ok(())) .map_err(|_err| Error::Shutdown), ), + Box::new( + identify_service + .for_each(|_| Ok(())) + .map_err(|_err| Error::Shutdown), + ), Box::new(timer_service.timer_futures.for_each(|_| Ok(()))), Box::new( outbound_peer_service @@ -569,7 +592,7 @@ impl Network { ckb_protocols: CKBProtocols, ckb_event_receiver: Receiver, ) -> NetworkResult { - let (network, p2p_service, timer_registry, ping_event_receiver, disc_event_receiver) = + let (network, p2p_service, timer_registry, receivers) = Self::inner_build(config, ckb_protocols)?; let (close_tx, close_rx) = oneshot::channel(); let network_future = Self::build_network_future( @@ -579,8 +602,7 @@ impl Network { p2p_service, timer_registry, ckb_event_receiver, - ping_event_receiver, - disc_event_receiver, + receivers, )?; Ok((network, close_tx, network_future)) } diff --git a/network/src/network_service.rs b/network/src/network_service.rs index 56a8535b91..afbdfd79f4 100644 --- a/network/src/network_service.rs +++ b/network/src/network_service.rs @@ -67,7 +67,7 @@ impl NetworkService { ckb_protocols: CKBProtocols, ckb_event_receiver: Receiver, ) -> Result { - let (network, p2p_service, timer_registry, ping_event_receiver, disc_event_receiver) = + let (network, p2p_service, timer_registry, receivers) = Network::inner_build(config, ckb_protocols)?; let (close_tx, close_rx) = oneshot::channel(); let (init_tx, init_rx) = oneshot::channel(); @@ -87,8 +87,7 @@ impl NetworkService { p2p_service, timer_registry, ckb_event_receiver, - ping_event_receiver, - disc_event_receiver, + receivers, ) .expect("Network thread init"); init_tx.send(()).expect("Network init signal send"); diff --git a/network/src/service.rs b/network/src/service.rs index a815acd24e..9dd961cdfe 100644 --- a/network/src/service.rs +++ b/network/src/service.rs @@ -1,5 +1,6 @@ pub mod ckb_service; pub mod discovery_service; +pub mod identify_service; pub mod outbound_peer_service; pub mod ping_service; pub mod timer_service; diff --git a/network/src/service/identify_service.rs b/network/src/service/identify_service.rs new file mode 100644 index 0000000000..8155e6d1f4 --- /dev/null +++ b/network/src/service/identify_service.rs @@ -0,0 +1,121 @@ +// use crate::peer_store::Behaviour; +use crate::Network; +use futures::{sync::mpsc, sync::oneshot, Async, Future, Stream}; +use log::{debug, warn}; +use std::sync::Arc; + +use p2p::{multiaddr::Multiaddr, secio::PeerId}; + +pub use p2p_identify::IdentifyProtocol; +use p2p_identify::{AddrManager, MisbehaveResult, Misbehavior}; + +#[derive(Clone)] +pub(crate) struct IdentifyAddressManager { + event_sender: mpsc::UnboundedSender, +} + +impl IdentifyAddressManager { + pub(crate) fn new( + event_sender: mpsc::UnboundedSender, + ) -> IdentifyAddressManager { + IdentifyAddressManager { event_sender } + } +} + +impl AddrManager for IdentifyAddressManager { + fn add_listen_addrs(&mut self, peer_id: &PeerId, addrs: Vec) { + let event = IdentifyEvent::AddListenAddrs { + peer_id: peer_id.clone(), + addrs, + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + } + } + + fn add_observed_addr(&mut self, peer_id: &PeerId, addr: Multiaddr) -> MisbehaveResult { + let event = IdentifyEvent::AddObservedAddr { + peer_id: peer_id.clone(), + addr, + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + } + // NOTE: for future usage + MisbehaveResult::Continue + } + + fn misbehave(&mut self, peer_id: &PeerId, kind: Misbehavior) -> MisbehaveResult { + let (sender, receiver) = oneshot::channel(); + let event = IdentifyEvent::Misbehave { + peer_id: peer_id.clone(), + kind, + result: sender, + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + MisbehaveResult::Disconnect + } else { + receiver.wait().unwrap_or(MisbehaveResult::Disconnect) + } + } +} + +pub enum IdentifyEvent { + AddListenAddrs { + peer_id: PeerId, + addrs: Vec, + }, + AddObservedAddr { + peer_id: PeerId, + addr: Multiaddr, + }, + Misbehave { + peer_id: PeerId, + kind: Misbehavior, + result: oneshot::Sender, + }, +} + +pub(crate) struct IdentifyService { + event_receiver: mpsc::UnboundedReceiver, + network: Arc, +} + +impl IdentifyService { + pub(crate) fn new( + network: Arc, + event_receiver: mpsc::UnboundedReceiver, + ) -> IdentifyService { + IdentifyService { + event_receiver, + network, + } + } +} + +impl Stream for IdentifyService { + type Item = (); + type Error = (); + fn poll(&mut self) -> Result>, Self::Error> { + match try_ready!(self.event_receiver.poll()) { + Some(IdentifyEvent::AddListenAddrs { .. }) => { + // TODO: how to transform those addresses + } + Some(IdentifyEvent::AddObservedAddr { .. }) => { + // TODO: how to transform this address + } + Some(IdentifyEvent::Misbehave { result, .. }) => { + // TODO: report misbehave + if result.send(MisbehaveResult::Continue).is_err() { + return Err(()); + } + } + None => { + debug!(target: "network", "identify service shutdown"); + return Ok(Async::Ready(None)); + } + } + Ok(Async::Ready(Some(()))) + } +} From bc4260ec8f0d98eacd48a51b101b39c0991942ef Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Wed, 20 Mar 2019 11:17:11 +0800 Subject: [PATCH 09/18] fix: Add observed address to peer store --- network/Cargo.toml | 4 --- network/src/service/identify_service.rs | 40 +++++++++++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/network/Cargo.toml b/network/Cargo.toml index d44f936757..1abab69d32 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -20,10 +20,6 @@ bytes = "0.4.12" tokio = "0.1.17" futures = { version = "0.1.19", features = ["use_std"] } snap = "0.2" -# p2p = { version = "0.2.0-alpha.1", package="tentacle" } -# secio = { version = "0.1.0", package="tentacle-secio" } -# p2p-ping = { version = "0.2.0", package="tentacle-ping" } -# p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", branch="discovery-use-multiaddr-format", package="tentacle-discovery" } p2p = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle" } secio = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-secio" } p2p-ping = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-ping" } diff --git a/network/src/service/identify_service.rs b/network/src/service/identify_service.rs index 8155e6d1f4..a3891ea231 100644 --- a/network/src/service/identify_service.rs +++ b/network/src/service/identify_service.rs @@ -2,9 +2,14 @@ use crate::Network; use futures::{sync::mpsc, sync::oneshot, Async, Future, Stream}; use log::{debug, warn}; +use std::collections::HashMap; use std::sync::Arc; -use p2p::{multiaddr::Multiaddr, secio::PeerId}; +use p2p::{ + multiaddr::{Multiaddr, Protocol}, + secio::PeerId, + utils::multiaddr_to_socketaddr, +}; pub use p2p_identify::IdentifyProtocol; use p2p_identify::{AddrManager, MisbehaveResult, Misbehavior}; @@ -80,6 +85,7 @@ pub enum IdentifyEvent { pub(crate) struct IdentifyService { event_receiver: mpsc::UnboundedReceiver, network: Arc, + listen_addrs: HashMap>, } impl IdentifyService { @@ -90,6 +96,7 @@ impl IdentifyService { IdentifyService { event_receiver, network, + listen_addrs: HashMap::default(), } } } @@ -99,11 +106,34 @@ impl Stream for IdentifyService { type Error = (); fn poll(&mut self) -> Result>, Self::Error> { match try_ready!(self.event_receiver.poll()) { - Some(IdentifyEvent::AddListenAddrs { .. }) => { - // TODO: how to transform those addresses + Some(IdentifyEvent::AddListenAddrs { peer_id, addrs }) => { + self.listen_addrs.insert(peer_id, addrs); } - Some(IdentifyEvent::AddObservedAddr { .. }) => { - // TODO: how to transform this address + Some(IdentifyEvent::AddObservedAddr { peer_id, addr }) => { + // TODO: how to use listen addresses + if let Some(addr) = self + .listen_addrs + .get(&peer_id) + .map(|addrs| addrs.iter().next()) + .unwrap_or(None) + .map(|addr| multiaddr_to_socketaddr(addr)) + .unwrap_or(None) + .map(|socket_addr| socket_addr.port()) + .map(move |port| { + addr.into_iter() + .map(|proto| match proto { + Protocol::Tcp(_) => Protocol::Tcp(port), + value => value, + }) + .collect() + }) + { + let _ = self + .network + .peer_store() + .write() + .add_discovered_address(&peer_id, addr); + } } Some(IdentifyEvent::Misbehave { result, .. }) => { // TODO: report misbehave From 9492854748764df0d6c496ebf7c25d6e0658ae4f Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Wed, 20 Mar 2019 13:25:20 +0800 Subject: [PATCH 10/18] fix: Parse protocol version bug --- network/src/protocol.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/network/src/protocol.rs b/network/src/protocol.rs index 4029f77f9d..9372bc3e9f 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol.rs @@ -92,8 +92,12 @@ impl ServiceProtocol for CKBHandler { fn init(&mut self, _control: &mut ServiceContext) {} fn connected(&mut self, control: &mut ServiceContext, session: &SessionContext, version: &str) { let (peer_id, version) = { - let parsed_version = version.parse::(); - if session.remote_pubkey.is_none() || parsed_version.is_err() { + // FIXME: version number should be discussed. + let parsed_version = version + .split(".") + .last() + .and_then(|v| v.parse::().ok()); + if session.remote_pubkey.is_none() || parsed_version.is_none() { error!(target: "network", "ckb protocol connected error, addr: {}, protocol:{}, version: {}", session.address, self.id, version); control.disconnect(session.id); return; From cbae925d261a52708e97494c1c73a3134a1cbaed Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Wed, 20 Mar 2019 15:27:40 +0800 Subject: [PATCH 11/18] test: Fix build test (network API change) --- network/src/protocol.rs | 5 +---- sync/src/tests/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/network/src/protocol.rs b/network/src/protocol.rs index 9372bc3e9f..b2c730a83c 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol.rs @@ -93,10 +93,7 @@ impl ServiceProtocol for CKBHandler { fn connected(&mut self, control: &mut ServiceContext, session: &SessionContext, version: &str) { let (peer_id, version) = { // FIXME: version number should be discussed. - let parsed_version = version - .split(".") - .last() - .and_then(|v| v.parse::().ok()); + let parsed_version = version.split('.').last().and_then(|v| v.parse::().ok()); if session.remote_pubkey.is_none() || parsed_version.is_none() { error!(target: "network", "ckb protocol connected error, addr: {}, protocol:{}, version: {}", session.address, self.id, version); control.disconnect(session.id); diff --git a/sync/src/tests/mod.rs b/sync/src/tests/mod.rs index 5f80c262d6..32a2c8ff2e 100644 --- a/sync/src/tests/mod.rs +++ b/sync/src/tests/mod.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use ckb_network::{ errors::Error as NetworkError, CKBProtocolContext, CKBProtocolHandler, PeerIndex, ProtocolId, SessionInfo, Severity, TimerToken, @@ -101,7 +102,7 @@ impl TestNode { timer_senders: self.timer_senders.clone(), }), *peer, - payload.clone(), + Bytes::from(payload.clone()), ) }; From 15b69b0d8804bdadd963c0ecc9be928e801fd858 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 21 Mar 2019 11:44:20 +0800 Subject: [PATCH 12/18] fix: Outbound connection select address bug --- network/src/network.rs | 5 ++--- network/src/service/discovery_service.rs | 20 +++++++++++++------- network/src/service/identify_service.rs | 6 ++---- network/src/service/outbound_peer_service.rs | 20 ++++++++++---------- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/network/src/network.rs b/network/src/network.rs index 5a503017e4..29fd8b93ec 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -328,15 +328,14 @@ impl Network { } } - pub fn dial(&self, expected_peer_id: &PeerId, mut addr: Multiaddr) { + pub fn dial(&self, expected_peer_id: &PeerId, addr: Multiaddr) { if expected_peer_id == self.local_peer_id() { debug!(target: "network", "ignore dial to self"); return; } debug!(target: "network", "dial to peer {:?} address {:?}", expected_peer_id, addr); match Multihash::from_bytes(expected_peer_id.as_bytes().to_vec()) { - Ok(peer_id_hash) => { - addr.append(multiaddr::Protocol::P2p(peer_id_hash)); + Ok(_peer_id_hash) => { self.dial_addr(addr); } Err(err) => { diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index 7a872a9321..fbcef16c40 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -10,6 +10,7 @@ use p2p::{ multiaddr::Multiaddr, secio::PeerId, traits::ServiceProtocol, + utils::extract_peer_id, yamux::session::SessionType, ProtocolId, SessionId, }; @@ -207,12 +208,17 @@ impl Stream for DiscoveryService { // NOTE: ignore add new addr message, handle this in identify protocol } Some(DiscoveryEvent::AddNewAddrs { session_id, addrs }) => { - if let Some(peer_id) = self.sessions.get(&session_id) { - let _ = self - .network - .peer_store() - .write() - .add_discovered_addresses(peer_id, addrs); + if let Some(_peer_id) = self.sessions.get(&session_id) { + // TODO: wait for peer store update + for addr in addrs.into_iter() { + if let Some(peer_id) = extract_peer_id(&addr) { + let _ = self + .network + .peer_store() + .write() + .add_discovered_address(&peer_id, addr); + } + } } } Some(DiscoveryEvent::Misbehave { @@ -227,7 +233,7 @@ impl Stream for DiscoveryService { .network .peer_store() .read() - .peers_to_attempt(n as u32) + .random_peers(n as u32) .into_iter() .map(|(_peer_id, addr)| addr) .collect(); diff --git a/network/src/service/identify_service.rs b/network/src/service/identify_service.rs index a3891ea231..04bde659d1 100644 --- a/network/src/service/identify_service.rs +++ b/network/src/service/identify_service.rs @@ -114,10 +114,8 @@ impl Stream for IdentifyService { if let Some(addr) = self .listen_addrs .get(&peer_id) - .map(|addrs| addrs.iter().next()) - .unwrap_or(None) - .map(|addr| multiaddr_to_socketaddr(addr)) - .unwrap_or(None) + .and_then(|addrs| addrs.iter().next()) + .and_then(|addr| multiaddr_to_socketaddr(addr)) .map(|socket_addr| socket_addr.port()) .map(move |port| { addr.into_iter() diff --git a/network/src/service/outbound_peer_service.rs b/network/src/service/outbound_peer_service.rs index 4f3906ff7f..b068cfc4a6 100644 --- a/network/src/service/outbound_peer_service.rs +++ b/network/src/service/outbound_peer_service.rs @@ -1,6 +1,6 @@ use crate::Network; use futures::{Async, Stream}; -use log::{debug, error}; +use log::{debug, warn}; use std::sync::Arc; use std::time::Duration; use std::usize; @@ -37,20 +37,20 @@ impl Stream for OutboundPeerService { .network .peer_store() .read() - .peers_to_attempt(new_outbound as u32); - for (peer_id, addr) in attempt_peers.iter().filter_map(|(peer_id, addr)| { - if self.network.local_peer_id() != peer_id { - Some((peer_id.clone(), addr.clone())) - } else { - None - } - }) { + .random_peers(new_outbound as u32) + .into_iter() + .filter(|(peer_id, _addr)| { + self.network.get_peer_index(peer_id).is_none() + && self.network.local_peer_id() != peer_id + }) + .collect::>(); + for (peer_id, addr) in attempt_peers.into_iter() { self.network.dial(&peer_id, addr); } } } None => { - error!(target: "network", "ckb outbound peer service stopped"); + warn!(target: "network", "ckb outbound peer service stopped"); return Ok(Async::Ready(None)); } } From 7650a8f78068d005327608ef4ca9348a81b132a5 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 21 Mar 2019 11:44:38 +0800 Subject: [PATCH 13/18] fix: Can not shutdown network bug --- network/src/network_service.rs | 76 ++++++++++++++++++---------------- src/cli/run_impl.rs | 1 + 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/network/src/network_service.rs b/network/src/network_service.rs index afbdfd79f4..30bcb497d4 100644 --- a/network/src/network_service.rs +++ b/network/src/network_service.rs @@ -9,27 +9,34 @@ use ckb_util::Mutex; use futures::future::Future; use futures::sync::mpsc::Receiver; use futures::sync::oneshot; -use log::{debug, info}; +use log::{debug, error, info}; use std::sync::Arc; -use std::thread; use tokio::runtime; pub struct StopHandler { signal: oneshot::Sender<()>, - thread: thread::JoinHandle<()>, + network_runtime: runtime::Runtime, } impl StopHandler { - pub fn new(signal: oneshot::Sender<()>, thread: thread::JoinHandle<()>) -> StopHandler { - StopHandler { signal, thread } + pub fn new(signal: oneshot::Sender<()>, network_runtime: runtime::Runtime) -> StopHandler { + StopHandler { + signal, + network_runtime, + } } pub fn close(self) { - let StopHandler { signal, thread } = self; + let StopHandler { + signal, + network_runtime, + } = self; if let Err(e) = signal.send(()) { debug!(target: "network", "send shutdown signal error, ignoring error: {:?}", e) }; - thread.join().expect("join network_service thread"); + // TODO: not that gracefully shutdown, will output below error message: + // "terminate called after throwing an instance of 'std::system_error'" + network_runtime.shutdown_now(); } } @@ -71,38 +78,35 @@ impl NetworkService { Network::inner_build(config, ckb_protocols)?; let (close_tx, close_rx) = oneshot::channel(); let (init_tx, init_rx) = oneshot::channel(); - let join_handle = thread::spawn({ - let network = Arc::clone(&network); - let config = config.clone(); - move || { - info!( - target: "network", - "network peer_id {:?}", - network.local_public_key().peer_id() - ); - let network_future = Network::build_network_future( - network, - &config, - close_rx, - p2p_service, - timer_registry, - ckb_event_receiver, - receivers, - ) - .expect("Network thread init"); - init_tx.send(()).expect("Network init signal send"); - // here we use default config - let network_runtime = runtime::Runtime::new().expect("Network tokio runtime init");; - match network_runtime.block_on_all(network_future) { - Ok(_) => info!(target: "network", "network service exit"), - Err(err) => panic!("network service exit unexpected {}", err), - } - } - }); + + info!( + target: "network", + "network peer_id {:?}", + network.local_public_key().peer_id() + ); + let network_future = Network::build_network_future( + Arc::clone(&network), + &config, + close_rx, + p2p_service, + timer_registry, + ckb_event_receiver, + receivers, + ) + .expect("Network thread init"); + init_tx.send(()).expect("Network init signal send"); + // here we use default config + let mut network_runtime = runtime::Runtime::new().expect("Network tokio runtime init"); + network_runtime.spawn( + network_future + .map(|_| info!(target: "network", "network service exit")) + .map_err(|err| error!("network service exit unexpected {}", err)), + ); + init_rx.wait().map_err(|_err| Error::Shutdown)?; Ok(NetworkService { network, - stop_handler: Mutex::new(Some(StopHandler::new(close_tx, join_handle))), + stop_handler: Mutex::new(Some(StopHandler::new(close_tx, network_runtime))), }) } diff --git a/src/cli/run_impl.rs b/src/cli/run_impl.rs index bf3218659d..6c43493cc7 100644 --- a/src/cli/run_impl.rs +++ b/src/cli/run_impl.rs @@ -106,6 +106,7 @@ pub fn run(setup: Setup) { rpc_server.close(); info!(target: "main", "Jsonrpc shutdown"); + // FIXME: should gracefully shutdown network network.close(); info!(target: "main", "Network shutdown"); } From 3aa0bc878dbc39f752b3a3d26d2c049068b64115 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 21 Mar 2019 15:52:36 +0800 Subject: [PATCH 14/18] fix: Fix dial address with peer_id --- network/src/network.rs | 5 +++-- network/src/service/discovery_service.rs | 9 ++++++++- network/src/service/identify_service.rs | 8 +++++--- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/network/src/network.rs b/network/src/network.rs index 29fd8b93ec..5a503017e4 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -328,14 +328,15 @@ impl Network { } } - pub fn dial(&self, expected_peer_id: &PeerId, addr: Multiaddr) { + pub fn dial(&self, expected_peer_id: &PeerId, mut addr: Multiaddr) { if expected_peer_id == self.local_peer_id() { debug!(target: "network", "ignore dial to self"); return; } debug!(target: "network", "dial to peer {:?} address {:?}", expected_peer_id, addr); match Multihash::from_bytes(expected_peer_id.as_bytes().to_vec()) { - Ok(_peer_id_hash) => { + Ok(peer_id_hash) => { + addr.append(multiaddr::Protocol::P2p(peer_id_hash)); self.dial_addr(addr); } Err(err) => { diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index fbcef16c40..1e17b52e44 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use p2p::{ context::{ServiceContext, SessionContext}, - multiaddr::Multiaddr, + multiaddr::{Multiaddr, Protocol}, secio::PeerId, traits::ServiceProtocol, utils::extract_peer_id, @@ -212,6 +212,13 @@ impl Stream for DiscoveryService { // TODO: wait for peer store update for addr in addrs.into_iter() { if let Some(peer_id) = extract_peer_id(&addr) { + let addr = addr + .into_iter() + .filter(|proto| match proto { + Protocol::P2p(_) => false, + _ => true, + }) + .collect::(); let _ = self .network .peer_store() diff --git a/network/src/service/identify_service.rs b/network/src/service/identify_service.rs index 04bde659d1..60505af9da 100644 --- a/network/src/service/identify_service.rs +++ b/network/src/service/identify_service.rs @@ -119,9 +119,11 @@ impl Stream for IdentifyService { .map(|socket_addr| socket_addr.port()) .map(move |port| { addr.into_iter() - .map(|proto| match proto { - Protocol::Tcp(_) => Protocol::Tcp(port), - value => value, + .filter_map(|proto| match proto { + Protocol::Tcp(_) => Some(Protocol::Tcp(port)), + // Remove p2p part + Protocol::P2p(_) => None, + value => Some(value), }) .collect() }) From 1981533209321a66c1470c2d1ad90337192baab7 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 21 Mar 2019 16:06:38 +0800 Subject: [PATCH 15/18] fix: Fix parse supported versions --- network/src/protocol.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/network/src/protocol.rs b/network/src/protocol.rs index b2c730a83c..95186ac14f 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol.rs @@ -55,8 +55,14 @@ impl CKBProtocol { pub fn build(&self) -> ProtocolMeta { let event_sender = self.event_sender.clone(); + let supported_versions = self + .supported_versions + .iter() + .map(|v| v.to_string()) + .collect::>(); MetaBuilder::default() .id(self.id) + .support_versions(supported_versions) .service_handle(move || { ProtocolHandle::Callback(Box::new(CKBHandler { id: self.id, @@ -92,8 +98,8 @@ impl ServiceProtocol for CKBHandler { fn init(&mut self, _control: &mut ServiceContext) {} fn connected(&mut self, control: &mut ServiceContext, session: &SessionContext, version: &str) { let (peer_id, version) = { - // FIXME: version number should be discussed. - let parsed_version = version.split('.').last().and_then(|v| v.parse::().ok()); + // TODO: version number should be discussed. + let parsed_version = version.parse::().ok(); if session.remote_pubkey.is_none() || parsed_version.is_none() { error!(target: "network", "ckb protocol connected error, addr: {}, protocol:{}, version: {}", session.address, self.id, version); control.disconnect(session.id); From 5461d5e752695fbe1b49bde24ff59a9538ae9095 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 21 Mar 2019 16:19:25 +0800 Subject: [PATCH 16/18] chore: Remove unnecessary clone --- network/src/network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/src/network.rs b/network/src/network.rs index 5a503017e4..c6f574e434 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -399,7 +399,7 @@ impl Network { .service_handle(move || { ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new( DISCOVERY_PROTOCOL_ID, - disc_sender.clone(), + disc_sender, ))) }) .build(); From 4a068b61e4c5b134ef44dd385b03e2bc5d852bc7 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 21 Mar 2019 17:14:36 +0800 Subject: [PATCH 17/18] chore: Remove AddNewAddr event --- network/src/service/discovery_service.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index 1e17b52e44..d2dcc56453 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -150,10 +150,6 @@ pub enum DiscoveryEvent { peer_id: Option, }, Disconnected(SessionId), - AddNewAddr { - session_id: SessionId, - addr: Multiaddr, - }, AddNewAddrs { session_id: SessionId, addrs: Vec, @@ -204,9 +200,6 @@ impl Stream for DiscoveryService { Some(DiscoveryEvent::Disconnected(session_id)) => { self.sessions.remove(&session_id); } - Some(DiscoveryEvent::AddNewAddr { .. }) => { - // NOTE: ignore add new addr message, handle this in identify protocol - } Some(DiscoveryEvent::AddNewAddrs { session_id, addrs }) => { if let Some(_peer_id) = self.sessions.get(&session_id) { // TODO: wait for peer store update @@ -262,12 +255,7 @@ pub struct DiscoveryAddressManager { } impl AddressManager for DiscoveryAddressManager { - fn add_new_addr(&mut self, session_id: SessionId, addr: Multiaddr) { - let event = DiscoveryEvent::AddNewAddr { session_id, addr }; - if self.event_sender.unbounded_send(event).is_err() { - warn!(target: "network", "receiver maybe dropped!"); - } - } + fn add_new_addr(&mut self, _session_id: SessionId, _addr: Multiaddr) {} fn add_new_addrs(&mut self, session_id: SessionId, addrs: Vec) { let event = DiscoveryEvent::AddNewAddrs { session_id, addrs }; From 3999aee5d5b2bb333c2d4a25af160430900a8666 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 21 Mar 2019 17:16:01 +0800 Subject: [PATCH 18/18] revert: Revert out bounded service logic --- network/src/service/outbound_peer_service.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/network/src/service/outbound_peer_service.rs b/network/src/service/outbound_peer_service.rs index b068cfc4a6..c9cb5ae247 100644 --- a/network/src/service/outbound_peer_service.rs +++ b/network/src/service/outbound_peer_service.rs @@ -37,14 +37,11 @@ impl Stream for OutboundPeerService { .network .peer_store() .read() - .random_peers(new_outbound as u32) + .peers_to_attempt(new_outbound as u32); + for (peer_id, addr) in attempt_peers .into_iter() - .filter(|(peer_id, _addr)| { - self.network.get_peer_index(peer_id).is_none() - && self.network.local_peer_id() != peer_id - }) - .collect::>(); - for (peer_id, addr) in attempt_peers.into_iter() { + .filter(|(peer_id, _addr)| self.network.local_peer_id() != peer_id) + { self.network.dial(&peer_id, addr); } }