From d85c7c2b4da96801433fd7d35b3096bcba82d849 Mon Sep 17 00:00:00 2001 From: Marek Franciszkiewicz Date: Fri, 26 Jun 2020 08:30:43 +0200 Subject: [PATCH] GFTP: JSON RPC server mode (#367) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Przemysław Rekucki <56750+prekucki@users.noreply.github.com> --- Cargo.lock | 437 ++++++++++++++------------- core/gftp/Cargo.toml | 14 +- core/gftp/examples/gftp-server.rs | 230 ++++++++++++++ core/gftp/readme.md | 75 ++++- core/gftp/src/bin/gftp.rs | 187 +++++++++--- core/gftp/src/gftp.rs | 44 ++- core/gftp/src/lib.rs | 3 +- core/gftp/src/rpc.rs | 223 ++++++++++++++ service-bus/bus/src/local_router.rs | 29 +- service-bus/bus/src/remote_router.rs | 7 +- service-bus/bus/src/typed.rs | 5 + 11 files changed, 969 insertions(+), 285 deletions(-) create mode 100644 core/gftp/examples/gftp-server.rs create mode 100644 core/gftp/src/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index ac02ad810c..86f70dbb9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,9 +10,9 @@ dependencies = [ "actix-rt", "actix_derive 0.5.0", "bitflags 1.2.1", - "bytes 0.5.4", + "bytes 0.5.5", "crossbeam-channel", - "derive_more 0.99.7", + "derive_more 0.99.8", "futures 0.3.5", "lazy_static", "log 0.4.8", @@ -32,7 +32,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09e55f0a5c2ca15795035d90c46bd0e73a5123b72f68f12596d6ba5282051380" dependencies = [ "bitflags 1.2.1", - "bytes 0.5.4", + "bytes 0.5.5", "futures-core", "futures-sink", "log 0.4.8", @@ -50,7 +50,7 @@ dependencies = [ "actix-rt", "actix-service", "actix-utils", - "derive_more 0.99.7", + "derive_more 0.99.8", "either", "futures 0.3.5", "http 0.2.1", @@ -69,8 +69,8 @@ dependencies = [ "actix-service", "actix-web", "bitflags 1.2.1", - "bytes 0.5.4", - "derive_more 0.99.7", + "bytes 0.5.5", + "derive_more 0.99.8", "futures-core", "futures-util", "log 0.4.8", @@ -95,10 +95,10 @@ dependencies = [ "base64 0.11.0", "bitflags 1.2.1", "brotli2", - "bytes 0.5.4", + "bytes 0.5.5", "chrono", "copyless", - "derive_more 0.99.7", + "derive_more 0.99.8", "either", "encoding_rs", "failure", @@ -133,8 +133,8 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a60f9ba7c4e6df97f3aacb14bb5c0cd7d98a49dcbaed0d7f292912ad9a6a3ed2" dependencies = [ - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -215,7 +215,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91164716d956745c79dcea5e66d2aa04506549958accefcede5368c70f2fd4ff" dependencies = [ - "derive_more 0.99.7", + "derive_more 0.99.8", "futures-channel", "lazy_static", "log 0.4.8", @@ -234,7 +234,7 @@ dependencies = [ "actix-rt", "actix-service", "actix-utils", - "derive_more 0.99.7", + "derive_more 0.99.8", "either", "futures 0.3.5", "log 0.4.8", @@ -250,7 +250,7 @@ dependencies = [ "actix-rt", "actix-service", "bitflags 1.2.1", - "bytes 0.5.4", + "bytes 0.5.5", "either", "futures 0.3.5", "log 0.4.8", @@ -277,8 +277,8 @@ dependencies = [ "actix-utils", "actix-web-codegen", "awc", - "bytes 0.5.4", - "derive_more 0.99.7", + "bytes 0.5.5", + "derive_more 0.99.8", "encoding_rs", "futures 0.3.5", "fxhash", @@ -301,8 +301,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a71bf475cbe07281d0b3696abb48212db118e7e23219f13596ce865235ff5766" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -314,7 +314,7 @@ dependencies = [ "actix-service", "actix-web", "base64 0.11.0", - "bytes 0.5.4", + "bytes 0.5.5", "futures 0.3.5", ] @@ -337,24 +337,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b95aceadaf327f18f0df5962fedc1bde2f870566a0b9f65c89508a3b1f79334c" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] name = "addr2line" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a49806b9dadc843c61e7c97e72490ad7f7220ae249012fbda9ad0609457c0543" +checksum = "602d785912f476e480434627e8732e6766b760c045bbf897d9dfaa9f4fbd399c" dependencies = [ "gimli", ] [[package]] name = "adler32" -version = "1.0.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" +checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d" [[package]] name = "aes" @@ -403,9 +403,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.10" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" +checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" dependencies = [ "memchr", ] @@ -438,9 +438,9 @@ dependencies = [ [[package]] name = "arc-swap" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b585a98a234c46fc563103e9278c9391fde1f4e6850334da895d27edb9580f62" +checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" [[package]] name = "arrayref" @@ -486,19 +486,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] name = "async-trait" -version = "0.1.33" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1c13101a3224fb178860ae372a031ce350bbd92d39968518f016744dde0bf7" +checksum = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -535,8 +535,8 @@ dependencies = [ "actix-rt", "actix-service", "base64 0.11.0", - "bytes 0.5.4", - "derive_more 0.99.7", + "bytes 0.5.5", + "derive_more 0.99.8", "futures-core", "log 0.4.8", "mime 0.3.16", @@ -549,13 +549,14 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.48" +version = "0.3.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0df2f85c8a2abbe3b7d7e748052fdd9b76a0458fdeb16ad4223f5eca78c7c130" +checksum = "05100821de9e028f12ae3d189176b41ee198341eb8f369956407fea2f5cc666c" dependencies = [ "addr2line", "cfg-if", "libc", + "miniz_oxide", "object", "rustc-demangle", ] @@ -585,17 +586,6 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" -[[package]] -name = "bigdecimal" -version = "0.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73d2953c59932d4a0ac941bb5ccca17e324ddae9aa487353dc22f0aade2c4b4" -dependencies = [ - "num-bigint", - "num-integer", - "num-traits", -] - [[package]] name = "bigdecimal" version = "0.1.2" @@ -750,9 +740,12 @@ dependencies = [ [[package]] name = "bytes" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +checksum = "118cf036fbb97d0816e3c34b2d7a1e8cfc60f68fcf63d550ddbe9bd5f59c213b" +dependencies = [ + "loom", +] [[package]] name = "bytestring" @@ -760,7 +753,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7c05fa5172da78a62d9949d662d2ac89d4cc7355d7b49adee5163f1fb3f363" dependencies = [ - "bytes 0.5.4", + "bytes 0.5.5", ] [[package]] @@ -789,9 +782,9 @@ dependencies = [ [[package]] name = "chunked_transfer" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b89647f09b9f4c838cb622799b2843e4e13bff64661dab9a0362bb92985addd" +checksum = "1d29eb15132782371f71da8f947dba48b3717bdb6fa771b9b434d645e40a7193" [[package]] name = "clap" @@ -931,12 +924,13 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab6bffe714b6bb07e42f201352c34f51fefd355ace793f9e638ebd52d23f98d2" +checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ "cfg-if", "crossbeam-utils", + "maybe-uninit", ] [[package]] @@ -1027,9 +1021,9 @@ dependencies = [ "fnv", "ident_case", "proc-macro2 1.0.18", - "quote 1.0.6", + "quote 1.0.7", "strsim 0.9.3", - "syn 1.0.30", + "syn 1.0.33", ] [[package]] @@ -1039,8 +1033,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" dependencies = [ "darling_core", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -1059,13 +1053,13 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.7" +version = "0.99.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2127768764f1556535c01b5326ef94bd60ff08dcfbdc544d53e69ed155610f5d" +checksum = "bc655351f820d774679da6cdc23355a93de496867d8203496675162e17b1d671" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -1081,11 +1075,11 @@ dependencies = [ [[package]] name = "diesel" -version = "1.4.4" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d7ca63eb2efea87a7f56a283acc49e2ce4b2bd54adf7465dc1d81fef13d8fc" +checksum = "3e2de9deab977a153492a1468d1b1c0662c1cf39e5ea87d0c060ecd59ef18d8c" dependencies = [ - "bigdecimal 0.0.15", + "bigdecimal", "byteorder", "chrono", "diesel_derives", @@ -1100,8 +1094,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -1155,11 +1149,10 @@ dependencies = [ [[package]] name = "dirs-sys" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b" +checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a" dependencies = [ - "cfg-if", "libc", "redox_users", "winapi 0.3.8", @@ -1184,9 +1177,9 @@ checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" [[package]] name = "dtoa" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3" +checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" [[package]] name = "either" @@ -1287,8 +1280,8 @@ checksum = "bc4bfcfacb61d231109d1d55202c1f33263319668b168843e02ad4652725ec9c" dependencies = [ "heck", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -1450,9 +1443,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", - "synstructure 0.12.3", + "quote 1.0.7", + "syn 1.0.33", + "synstructure 0.12.4", ] [[package]] @@ -1494,9 +1487,9 @@ dependencies = [ [[package]] name = "flexi_logger" -version = "0.15.2" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdd4de217664c0ebd05d54e722f42c190fdf31c8965a77fcfac9253ff61a0369" +checksum = "56705e43071295961742aa1dc19943e1340d89e33aaa8beb15a700f9f1637959" dependencies = [ "chrono", "glob", @@ -1621,8 +1614,8 @@ checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" dependencies = [ "proc-macro-hack", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -1670,6 +1663,19 @@ dependencies = [ "byteorder", ] +[[package]] +name = "generator" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add72f17bb81521258fcc8a7a3245b1e184e916bfbe34f0ea89558f440df5c68" +dependencies = [ + "cc", + "libc", + "log 0.4.8", + "rustc_version", + "winapi 0.3.8", +] + [[package]] name = "generic-array" version = "0.9.0" @@ -1701,7 +1707,7 @@ dependencies = [ [[package]] name = "gftp" -version = "0.1.0" +version = "0.1.1" dependencies = [ "actix-rt", "anyhow", @@ -1711,8 +1717,13 @@ dependencies = [ "futures 0.3.5", "log 0.4.8", "rand 0.7.3", + "serde", + "serde_json", "sha3", "structopt", + "tempdir", + "thiserror", + "tokio 0.2.21", "url 2.1.1", "ya-client-model", "ya-core-model", @@ -1759,7 +1770,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79b7246d7e4b979c03fa093da39cfb3617a96bbeee6310af63991668d7e843ff" dependencies = [ - "bytes 0.5.4", + "bytes 0.5.5", "fnv", "futures-core", "futures-sink", @@ -1792,9 +1803,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91780f809e750b0a89f5544be56617ff6b1227ee485bcb06ebe10cdf89bd3b71" +checksum = "b9586eedd4ce6b3c498bc3b4dd92fc9f11166aa908a914071953768066c67909" dependencies = [ "libc", ] @@ -1853,7 +1864,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" dependencies = [ - "bytes 0.5.4", + "bytes 0.5.5", "fnv", "itoa", ] @@ -1887,9 +1898,9 @@ dependencies = [ [[package]] name = "humantime" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b6c53306532d3c8e8087b44e6580e10db51a023cf9b433cea2ac38066b92da" +checksum = "3c1ad908cc71012b7bea4d0c53ba96a8cba9962f048fa68d143376143d863b7a" [[package]] name = "hyper" @@ -2049,9 +2060,9 @@ dependencies = [ [[package]] name = "itoa" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" +checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" [[package]] name = "jsonrpc-core" @@ -2204,8 +2215,19 @@ checksum = "06da09a8b91f6c576b9575b326beae59be1df52967f2d4e5e8af67f2c458e737" dependencies = [ "darling", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", +] + +[[package]] +name = "loom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ecc775857611e1df29abba5c41355cdf540e7e9d4acfdf0f355eefee82330b7" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", ] [[package]] @@ -2267,8 +2289,8 @@ checksum = "9753f12909fd8d923f75ae5c3258cae1ed3c8ec052e1b38c93c21a6d157f789c" dependencies = [ "migrations_internals", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -2298,9 +2320,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" +checksum = "791daaae1ed6889560f8c4359194f56648355540573244a5448a83ba1ecc7435" dependencies = [ "adler32", ] @@ -2332,7 +2354,7 @@ checksum = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3" dependencies = [ "log 0.4.8", "mio", - "miow 0.3.4", + "miow 0.3.5", "winapi 0.3.8", ] @@ -2361,9 +2383,9 @@ dependencies = [ [[package]] name = "miow" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22dfdd1d51b2639a5abd17ed07005c3af05fb7a2a3b1a1d0d7af1000a520c1c7" +checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e" dependencies = [ "socket2", "winapi 0.3.8", @@ -2456,9 +2478,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" +checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" dependencies = [ "autocfg 1.0.0", "num-traits", @@ -2466,9 +2488,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" dependencies = [ "autocfg 1.0.0", ] @@ -2485,9 +2507,9 @@ dependencies = [ [[package]] name = "object" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cbca9424c482ee628fa549d9c812e2cd22f1180b9222c9200fdfa6eb31aecb2" +checksum = "1ab52be62400ca80aa00285d25253d7f7c437b7375c4de678f5405d3afe82ca5" [[package]] name = "ole32-sys" @@ -2533,18 +2555,18 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" [[package]] name = "openssl-src" -version = "111.9.0+1.1.1g" +version = "111.10.0+1.1.1g" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2dbe10ddd1eb335aba3780eb2eaa13e1b7b441d2562fd962398740927f39ec4" +checksum = "47cd4a96d49c3abf4cac8e8a80cba998a030c75608f158fb1c5f609772f265e6" dependencies = [ "cc", ] [[package]] name = "openssl-sys" -version = "0.9.57" +version = "0.9.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7410fef80af8ac071d4f63755c0ab89ac3df0fd1ea91f1d1f37cf5cec4395990" +checksum = "a842db4709b604f0fe5d1170ae3565899be2ad3d9cbc72dedc789ac0511f78de" dependencies = [ "autocfg 1.0.0", "cc", @@ -2736,29 +2758,29 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.19" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a1acf4a3e70849f8a673497ef984f043f95d2d8252dcdf74d54e6a1e47e8a" +checksum = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.19" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "194e88048b71a3e02eb4ee36a6995fed9b8236c11a7bb9f7247a9d9835b3f265" +checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] name = "pin-project-lite" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9df32da11d84f3a7d70205549562966279adb900e080fad3dccd8e64afccf0ad" +checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" [[package]] name = "pin-utils" @@ -2813,8 +2835,8 @@ checksum = "98e9e4b82e0ef281812565ea4751049f1bdcdfccda7d3f459f2e138a40c08678" dependencies = [ "proc-macro-error-attr", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", "version_check 0.9.2", ] @@ -2825,8 +2847,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f5444ead4e9935abd7f27dc51f7e852a0569ac888096d5ec2499470794e2e53" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", "syn-mid", "version_check 0.9.2", ] @@ -2839,9 +2861,9 @@ checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" [[package]] name = "proc-macro-nested" -version = "0.1.4" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" [[package]] name = "proc-macro2" @@ -2858,7 +2880,7 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa" dependencies = [ - "unicode-xid 0.2.0", + "unicode-xid 0.2.1", ] [[package]] @@ -2969,9 +2991,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54a21852a652ad6f610c9510194f398ff6f8692e334fd1145fed931f7fbe44ea" +checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" dependencies = [ "proc-macro2 1.0.18", ] @@ -3225,9 +3247,9 @@ checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" [[package]] name = "remove_dir_all" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ "winapi 0.3.8", ] @@ -3505,38 +3527,38 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.111" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9124df5b40cbd380080b2cc6ab894c040a3070d995f5c9dc77e18c34a8ae37d" +checksum = "5317f7588f0a5078ee60ef675ef96735a1442132dc645eb1d12c018620ed8cd3" dependencies = [ "serde_derive", ] [[package]] name = "serde_bytes" -version = "0.11.4" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bf487fbf5c6239d7ea2ff8b10cb6b811cd4b5080d1c2aeed1dec18753c06e10" +checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9" dependencies = [ "serde", ] [[package]] name = "serde_derive" -version = "1.0.111" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2c3ac8e6ca1e9c80b8be1023940162bf81ae3cffbb1809474152f2ce1eb250" +checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] name = "serde_json" -version = "1.0.53" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" +checksum = "ec2c5d7e739bc07a3e73381a39d61fdb5f671c60c1df26a130690665803d8226" dependencies = [ "itoa", "ryu", @@ -3557,9 +3579,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.12" +version = "0.8.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16c7a592a1ec97c9c1c68d75b6e537dcbf60c7618e038e7841e00af1d9ccf0c4" +checksum = "ae3e2dd40a7cdc18ca80db804b7f461a39bb721160a85c9a1fa30134bf3c02a5" dependencies = [ "dtoa", "linked-hash-map", @@ -3585,8 +3607,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d08338d8024b227c62bd68a12c7c9883f5c66780abaef15c550dc56f46ee6515" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -3660,9 +3682,9 @@ checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" [[package]] name = "signal-hook" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ff2db2112d6c761e12522c65f7768548bd6e8cd23d2a9dae162520626629bd6" +checksum = "604508c1418b99dfe1925ca9224829bb2a8a9a04dda655cc01fcad46f4ab05ed" dependencies = [ "futures 0.1.29", "libc", @@ -3764,9 +3786,9 @@ checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" [[package]] name = "structopt" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "863246aaf5ddd0d6928dfeb1a9ca65f505599e4e1b399935ef7e75107516b4ef" +checksum = "de2f5e239ee807089b62adce73e48c625e0ed80df02c7ab3f068f5db5281065c" dependencies = [ "clap", "lazy_static", @@ -3775,15 +3797,15 @@ dependencies = [ [[package]] name = "structopt-derive" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d239ca4b13aee7a2142e6795cbd69e457665ff8037aed33b3effdc430d2f927a" +checksum = "510413f9de616762a4fbeab62509bf15c729603b72d7cd71280fbca431b1c118" dependencies = [ "heck", "proc-macro-error", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -3800,8 +3822,8 @@ checksum = "5e6e163a520367c465f59e0a61a23cfae3b10b6546d78b6f672a382be79f7110" dependencies = [ "heck", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -3840,13 +3862,13 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.30" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a56fabc59dce20fe48b6c832cc249c713e7ed88fa28b0ee0a3bfcaae5fe4e2" +checksum = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "unicode-xid 0.2.0", + "quote 1.0.7", + "unicode-xid 0.2.1", ] [[package]] @@ -3856,8 +3878,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -3883,14 +3905,14 @@ dependencies = [ [[package]] name = "synstructure" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545" +checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", - "unicode-xid 0.2.0", + "quote 1.0.7", + "syn 1.0.33", + "unicode-xid 0.2.1", ] [[package]] @@ -3967,22 +3989,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b13f926965ad00595dd129fa12823b04bbf866e9085ab0a5f2b05b850fbfc344" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "893582086c2f98cde18f906265a65b5030a074b1046c674ae898be6519a7f479" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -4022,6 +4044,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinyvec" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53953d2d3a5ad81d9f844a32f14ebb121f50b650cd59d0ee2a07cf13c617efed" + [[package]] name = "tokio" version = "0.1.22" @@ -4052,7 +4080,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ - "bytes 0.5.4", + "bytes 0.5.5", "fnv", "futures-core", "iovec", @@ -4159,8 +4187,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -4312,7 +4340,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" dependencies = [ - "bytes 0.5.4", + "bytes 0.5.5", "futures-core", "futures-sink", "log 0.4.8", @@ -4326,7 +4354,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes 0.5.4", + "bytes 0.5.5", "futures-core", "futures-sink", "log 0.4.8", @@ -4438,11 +4466,11 @@ dependencies = [ [[package]] name = "unicode-normalization" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" +checksum = "6fb19cf769fa8c6a80a162df694621ebeb4dafb606470b2b2fce0be40a98a977" dependencies = [ - "smallvec 1.4.0", + "tinyvec", ] [[package]] @@ -4471,9 +4499,9 @@ checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" [[package]] name = "unicode-xid" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" [[package]] name = "untrusted" @@ -4520,6 +4548,7 @@ dependencies = [ "idna 0.2.0", "matches", "percent-encoding 2.1.0", + "serde", ] [[package]] @@ -4558,8 +4587,8 @@ checksum = "c2ca2a14bc3fc5b64d188b087a7d3a927df87b152e941ccfbc66672e20c467ae" dependencies = [ "nom", "proc-macro2 1.0.18", - "quote 1.0.6", - "syn 1.0.30", + "quote 1.0.7", + "syn 1.0.33", ] [[package]] @@ -4574,9 +4603,9 @@ dependencies = [ [[package]] name = "vcpkg" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55d1e41d56121e07f1e223db0a4def204e45c85425f6a16d462fd07c8d10d74c" +checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" [[package]] name = "vec_map" @@ -4701,9 +4730,9 @@ dependencies = [ [[package]] name = "widestring" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "effc0e4ff8085673ea7b9b2e3c73f6bd4d118810c9009ed8f1e16bd96c331db6" +checksum = "a763e303c0e0f23b0da40888724762e802a8ffefbc22de4127ef42493c2ea68c" [[package]] name = "winapi" @@ -4819,7 +4848,7 @@ source = "git+https://github.com/golemfactory/ya-client.git?rev=074639b2b67c0f6e dependencies = [ "awc", "backtrace", - "bytes 0.5.4", + "bytes 0.5.5", "chrono", "envy", "failure", @@ -4838,7 +4867,7 @@ name = "ya-client-model" version = "0.1.0" source = "git+https://github.com/golemfactory/ya-client.git?rev=074639b2b67c0f6e5e3252dfc4b0a914823ea568#074639b2b67c0f6e5e3252dfc4b0a914823ea568" dependencies = [ - "bigdecimal 0.1.2", + "bigdecimal", "chrono", "diesel", "serde", @@ -4850,7 +4879,7 @@ dependencies = [ name = "ya-core-model" version = "0.1.0" dependencies = [ - "bigdecimal 0.1.2", + "bigdecimal", "chrono", "log 0.4.8", "serde", @@ -4952,7 +4981,7 @@ dependencies = [ "actix-web", "anyhow", "chrono", - "derive_more 0.99.7", + "derive_more 0.99.8", "diesel", "diesel_migrations", "digest 0.8.1", @@ -5043,7 +5072,7 @@ dependencies = [ "actix-web", "anyhow", "base64 0.11.0", - "bigdecimal 0.1.2", + "bigdecimal", "chrono", "diesel", "diesel_migrations", @@ -5086,7 +5115,7 @@ dependencies = [ "actix-rt", "anyhow", "awc", - "bigdecimal 0.1.2", + "bigdecimal", "bitflags 1.2.1", "chrono", "diesel", @@ -5121,7 +5150,7 @@ name = "ya-persistence" version = "0.2.0" dependencies = [ "anyhow", - "bigdecimal 0.1.2", + "bigdecimal", "chrono", "diesel", "dotenv 0.14.1", @@ -5143,15 +5172,15 @@ dependencies = [ "actix-rt", "actix_derive 0.2.0", "anyhow", - "bigdecimal 0.1.2", + "bigdecimal", "chrono", - "derive_more 0.99.7", + "derive_more 0.99.8", "dialoguer", "dotenv 0.15.0", "env_logger 0.7.1", "futures 0.3.5", "futures-util", - "humantime 2.0.0", + "humantime 2.0.1", "libc", "log 0.4.8", "log-derive", @@ -5183,7 +5212,7 @@ dependencies = [ "dotenv 0.15.0", "env_logger 0.7.1", "futures 0.3.5", - "humantime 2.0.0", + "humantime 2.0.1", "log 0.4.8", "serde_json", "structopt", @@ -5203,7 +5232,7 @@ name = "ya-sb-proto" version = "0.1.0" dependencies = [ "bytes 0.4.12", - "bytes 0.5.4", + "bytes 0.5.5", "prost", "prost-build", "serial_test", @@ -5241,7 +5270,7 @@ name = "ya-sb-util" version = "0.1.0" dependencies = [ "bytes 0.4.12", - "bytes 0.5.4", + "bytes 0.5.5", "futures 0.3.5", "pin-project", ] @@ -5281,11 +5310,11 @@ dependencies = [ "log 0.4.8", "proc-macro-hack", "proc-macro2 1.0.18", - "quote 1.0.6", + "quote 1.0.7", "structopt", "strum", "strum_macros", - "syn 1.0.30", + "syn 1.0.33", "ya-service-api-interfaces", ] @@ -5359,7 +5388,7 @@ dependencies = [ "actix-rt", "actix-web", "awc", - "bytes 0.5.4", + "bytes 0.5.5", "env_logger 0.7.1", "futures 0.3.5", "gftp", @@ -5400,7 +5429,7 @@ version = "0.1.0" dependencies = [ "actix", "anyhow", - "derive_more 0.99.7", + "derive_more 0.99.8", "futures 0.3.5", "futures-util", "libc", diff --git a/core/gftp/Cargo.toml b/core/gftp/Cargo.toml index 9889bd555e..587a36bc63 100644 --- a/core/gftp/Cargo.toml +++ b/core/gftp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "gftp" -version = "0.1.0" +version = "0.1.1" authors = ["Golem Factory "] edition = "2018" @@ -21,6 +21,16 @@ env_logger = "0.7.1" futures = "0.3" rand = "0.7.3" structopt = "0.3.9" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" sha3 = "0.8.2" +tokio = { version = "0.2", features = ["io-std", "time"] } +thiserror = "1.0.20" dotenv = "0.15.0" -url = "2.1.1" +url = { version = "2.1.1", features = ["serde"] } + + +[dev-dependencies] +sha3 = "0.8.2" +tokio = { version = "0.2", features = ["process"] } +tempdir = "0.3.7" diff --git a/core/gftp/examples/gftp-server.rs b/core/gftp/examples/gftp-server.rs new file mode 100644 index 0000000000..e7dd37ec10 --- /dev/null +++ b/core/gftp/examples/gftp-server.rs @@ -0,0 +1,230 @@ +use anyhow::{anyhow, Result}; +use futures::future::{FutureExt, LocalBoxFuture}; +use gftp::rpc::*; +use sha3::digest::generic_array::GenericArray; +use sha3::Digest; +use std::ffi::OsString; +use std::fs::OpenOptions; +use std::io::Read; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use std::sync::atomic::{AtomicUsize, Ordering}; +use structopt::StructOpt; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{ChildStdin, ChildStdout, Command}; + +static SEQ: AtomicUsize = AtomicUsize::new(0); +type HashOutput = GenericArray::OutputSize>; + +/// Build the GFTP binary, start the daemon and run: +/// +/// `cargo run --example gftp-server ../../target/debug/gftp Cargo.toml` +#[derive(StructOpt)] +struct Args { + /// Path to GFTP binary + gftp_bin: PathBuf, + /// File to share + share: PathBuf, +} + +trait ReadRpcMessage { + fn read_message(&mut self) -> LocalBoxFuture>; +} + +trait WriteRpcMessage { + fn write_message(&mut self, msg: RpcMessage) -> LocalBoxFuture>; +} + +impl ReadRpcMessage for BufReader { + fn read_message(&mut self) -> LocalBoxFuture> { + async move { + let mut buffer = String::new(); + self.read_line(&mut buffer).await?; + log::info!("[Rx] {}", buffer.trim()); + let msg = serde_json::from_str::(&buffer)?; + Ok(msg) + } + .boxed_local() + } +} + +impl WriteRpcMessage for ChildStdin { + fn write_message(&mut self, msg: RpcMessage) -> LocalBoxFuture> { + async move { + let ser = format!("{}\r\n", serde_json::to_string(&msg)?); + log::info!("[Tx] {}", ser.trim()); + self.write_all(ser.as_bytes()).await?; + self.flush(); + Ok(()) + } + .boxed_local() + } +} + +async fn send( + stdin: &mut ChildStdin, + reader: &mut BufReader, + req: RpcRequest, +) -> Result { + let id = SEQ.fetch_add(1, Ordering::Relaxed) as i64; + let msg = RpcMessage::request(Some(&RpcId::Int(id)), req); + stdin.write_message(msg).await?; + + let res = reader.read_message().await?; + match res.id { + Some(RpcId::Int(v)) => match v == id { + false => return Err(anyhow!("Invalid response ID: {}, expected {}", v, id)), + _ => (), + }, + id => return Err(anyhow!("Invalid response ID: {:?}", id)), + } + + match res.body { + RpcBody::Error { error } => return Err(anyhow!("Request {:?} failed: {:?}", id, error)), + RpcBody::Request { .. } => return Err(anyhow!("Unexpected message: {:?}", res)), + RpcBody::Result { result } => Ok(result), + } +} + +fn hash_file(path: &Path) -> Result { + let mut file_src = OpenOptions::new().read(true).open(path)?; + + let mut hasher = sha3::Sha3_512::default(); + let mut chunk = vec![0; 4096]; + + while let Ok(count) = file_src.read(&mut chunk[..]) { + hasher.input(&chunk[..count]); + if count != 4096 { + break; + } + } + Ok(hasher.result()) +} + +#[actix_rt::main] +async fn main() -> Result<()> { + dotenv::dotenv().ok(); + std::env::set_var( + "RUST_LOG", + std::env::var("RUST_LOG").unwrap_or("info".into()), + ); + env_logger::init(); + + let args = Args::from_args(); + if !args.gftp_bin.exists() { + return Err(anyhow!( + "Gftp binary does not exist: {}", + args.gftp_bin.display() + )); + } + if !args.share.exists() { + return Err(anyhow!( + "Shared file does not exist: {}", + args.gftp_bin.display() + )); + } + + let tmp_dir = tempdir::TempDir::new("gftp-server")?; + let published_hash = hash_file(&args.share)?; + + log::info!("spawning server"); + let mut child = Command::new(args.gftp_bin) + .arg(OsString::from("server")) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + let mut stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + let mut reader = BufReader::new(stdout); + + log::info!("sending version request"); + let req = RpcRequest::Version {}; + send(&mut stdin, &mut reader, req).await?; + + log::info!("sending publish request"); + let files = vec![args.share.clone()]; + let req = RpcRequest::Publish { files }; + let urls = match send(&mut stdin, &mut reader, req).await? { + RpcResult::Files(files) => files.into_iter().map(|r| r.url).collect::>(), + result => return Err(anyhow!("Invalid result: {:?}", result)), + }; + + log::info!("sending close request"); + let req = RpcRequest::Close { urls: urls.clone() }; + match send(&mut stdin, &mut reader, req).await? { + RpcResult::Statuses(vec) => { + if vec.iter().any(|b| b == &RpcStatusResult::Error) { + return Err(anyhow!("Invalid result: {:?}", vec)); + } + } + result => return Err(anyhow!("Invalid result: {:?}", result)), + } + + log::info!("sending erroneous close request"); + let req = RpcRequest::Close { urls }; + match send(&mut stdin, &mut reader, req).await? { + RpcResult::Statuses(vec) => { + if vec.iter().any(|b| b == &RpcStatusResult::Ok) { + return Err(anyhow!("Invalid result: {:?}", vec)); + } + } + result => return Err(anyhow!("Invalid result: {:?}", result)), + } + + log::info!("sending publish request (for download)"); + let files = vec![args.share.clone()]; + let req = RpcRequest::Publish { files }; + let url = match send(&mut stdin, &mut reader, req).await? { + RpcResult::Files(files) => files + .into_iter() + .map(|r| r.url) + .next() + .ok_or_else(|| anyhow!("Missing URL in response"))?, + result => return Err(anyhow!("Invalid result: {:?}", result)), + }; + + log::info!("sending download request"); + let output_file = tmp_dir.path().join("tmp-download"); + let req = RpcRequest::Download { + url, + output_file: output_file.clone(), + }; + send(&mut stdin, &mut reader, req).await?; + + if hash_file(&output_file)? != published_hash { + return Err(anyhow!("Invalid file hash (receive request)")); + } else { + log::info!("file checksum ok"); + } + + log::info!("sending receive request"); + let output_file = tmp_dir.path().join("tmp-receive"); + let req = RpcRequest::Receive { + output_file: output_file.clone(), + }; + let url = match send(&mut stdin, &mut reader, req).await? { + RpcResult::File(file_result) => file_result.url, + result => return Err(anyhow!("Invalid result: {:?}", result)), + }; + + log::info!("sending upload request"); + let req = RpcRequest::Upload { + url, + file: args.share, + }; + send(&mut stdin, &mut reader, req).await?; + + if hash_file(&output_file)? != published_hash { + return Err(anyhow!("Invalid file hash (receive request)")); + } else { + log::info!("file checksum ok"); + } + + log::info!("sending shutdown request"); + let req = RpcRequest::Shutdown {}; + send(&mut stdin, &mut reader, req).await?; + + child.await?; + Ok(()) +} diff --git a/core/gftp/readme.md b/core/gftp/readme.md index 92f1ab6306..e58f2f48d3 100644 --- a/core/gftp/readme.md +++ b/core/gftp/readme.md @@ -1,21 +1,32 @@ # Using gftp transfer binary -## Publishing files: +## Publishing files Start yagna service: -``` +```bash cargo run service run ``` -Publish chosen file. Copy file hash from logs. -``` +Publish a chosen file (blocking). +```bash cargo run -p gftp -- publish {file name} -... -Published file [LICENSE] as gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/edb0016d9f8bafb54540da34f05a8d510de8114488f23916276bdead05509a53. -... ``` -## Downloading files: +Example output: +```json +{"result": [{"file": "Cargo.toml", "url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/39dc05a25ea97a1c90166658d93786f3302a51b8e31eb9b26001b615dea7e773"}]} +``` + +or with `--verbose` (`-v`) +```bash +cargo run -p gftp -- publish {file name} -v +``` + +```json +{"jsonrpc": "2.0", "id": null, "result": [{"file": "Cargo.toml", "url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/39dc05a25ea97a1c90166658d93786f3302a51b8e31eb9b26001b615dea7e773"}]} +``` + +## Downloading a file ``` cargo run -p gftp -- download \ @@ -23,18 +34,52 @@ cargo run -p gftp -- download \ -o workdir/gftp/download.txt ``` -## Uploading file +## Uploading a file -Publish file for upload: +Publish file for upload (blocking): ``` -cargo run -p gftp -- await-upload workdir/gftp-upload/License -... -[2020-03-03T10:17:57Z INFO gftp] Waiting for file upload [workdir/gftp-upload/License] on url [gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/z2IeDvgs1Q1hZ6seR0iSEsKW8kxdxQCK0eoz6DsYVznqJIl5K18NqwJPdLgesY9yR]. -... +cargo run -p gftp -- receive workdir/gftp-upload/License ``` Upload file on provider side: ``` -cargo run --release -p gftp -- upload LICENSE gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/z2IeDvgs1Q1hZ6seR0iSEsKW8kxdxQCK0eoz6DsYVznqJIl5K18NqwJPdLgesY9yR +cargo run -p gftp -- upload LICENSE gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/z2IeDvgs1Q1hZ6seR0iSEsKW8kxdxQCK0eoz6DsYVznqJIl5K18NqwJPdLgesY9yR +``` + +## JSON-RPC 2.0 server + +To start the application in JSON RPC server mode, type: + +``` +cargo run -p gftp -- server ``` + +JSON RPC messages can be sent to application's stdin. **Each JSON object needs to be terminated with a new line** (`\n`). + +### Publish + +```json +{"jsonrpc": "2.0", "id": "1", "method": "publish", "params": {"files": ["Cargo.toml"]}} +``` + +### Download +```json +{"jsonrpc": "2.0", "id": 2, "method": "download", "params": {"url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/1d040d4ea83249ec6b8264305365acf3068e095245ea3981de1c4b16782253cc", "output_file": "/home/me/download.bin"}} +``` + +### AwaitUpload +```json +{"jsonrpc": "2.0", "id": "3", "method": "receive", "params": {"output_file": "/home/me/upload.bin"}} +``` + +### Upload +```json +{"jsonrpc": "2.0", "id": 4, "method": "upload", "params": {"url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/1d040d4ea83249ec6b8264305365acf3068e095245ea3981de1c4b16782253cc", "file": "/etc/passwd"}} +``` + +## Flags + +- `-v`, `--verbose` + + Increases output verbosity to match the one in JSON RPC server mode. diff --git a/core/gftp/src/bin/gftp.rs b/core/gftp/src/bin/gftp.rs index 2d8bf8f910..68b1598a6f 100644 --- a/core/gftp/src/bin/gftp.rs +++ b/core/gftp/src/bin/gftp.rs @@ -1,63 +1,158 @@ +use actix_rt::Arbiter; use anyhow::Result; -use log::info; -use std::path::PathBuf; -use structopt::StructOpt; -use url::Url; +use env_logger::{Builder, Env, Target}; +use gftp::rpc::{RpcBody, RpcId, RpcMessage, RpcRequest, RpcResult, RpcStatusResult}; +use std::mem; +use structopt::{clap, StructOpt}; +use tokio::io; +use tokio::io::AsyncBufReadExt; +use tokio::time::Duration; #[derive(StructOpt)] -pub enum CmdLine { - Publish { files: Vec }, - Download { url: Url, output_file: PathBuf }, - Upload { file: PathBuf, url: Url }, - AwaitUpload { filepath: PathBuf }, +struct Args { + #[structopt(flatten)] + command: Command, + /// Increases output verbosity + #[structopt( + short, + long, + set = clap::ArgSettings::Global, + )] + verbose: bool, } -#[actix_rt::main] -async fn main() -> Result<()> { - dotenv::dotenv().ok(); - env_logger::init(); - - let cmd_args = CmdLine::from_args(); +#[derive(StructOpt)] +#[structopt(global_setting = clap::AppSettings::DeriveDisplayOrder)] +enum Command { + #[structopt(flatten)] + Command(RpcRequest), + /// Starts in JSON RPC server mode + Server, +} - match cmd_args { - CmdLine::Publish { files } => { - for path in files { - let url = gftp::publish(&path).await?; - info!("Published file [{}] as {}.", &path.display(), url); - } +#[derive(Debug, Clone, Copy)] +enum ExecMode { + OneShot, + Service, + Shutdown, +} - actix_rt::signal::ctrl_c().await?; - info!("Received ctrl-c signal. Shutting down.") +async fn execute(id: Option, request: RpcRequest, verbose: bool) -> ExecMode { + let id = id.as_ref(); + match execute_inner(id, request, verbose).await { + Ok(exec_mode) => exec_mode, + Err(error) => { + RpcMessage::error(id, error).print(verbose); + ExecMode::OneShot } - CmdLine::Download { url, output_file } => { - info!( - "Downloading file from [{}], target path [{}].", - url, - output_file.display() - ); + } +} +async fn execute_inner(id: Option<&RpcId>, request: RpcRequest, verbose: bool) -> Result { + let exec_mode = match request { + RpcRequest::Version {} => { + let version = clap::crate_version!().to_string(); + RpcMessage::response(id, RpcResult::String(version)).print(verbose); + ExecMode::OneShot + } + RpcRequest::Publish { files } => { + let mut result = Vec::new(); + for file in files { + let url = gftp::publish(&file).await?; + result.push((file, url)); + } + match result.len() { + 0 => RpcMessage::request_error(id), + _ => RpcMessage::files_response(id, result), + } + .print(verbose); + ExecMode::Service + } + RpcRequest::Close { urls } => { + let mut statuses = Vec::with_capacity(urls.len()); + for url in urls { + let result = gftp::close(&url).await?; + statuses.push(result.into()) + } + match statuses.len() { + 0 => RpcMessage::request_error(id), + _ => RpcMessage::response(id, RpcResult::Statuses(statuses)), + } + .print(verbose); + ExecMode::OneShot + } + RpcRequest::Download { url, output_file } => { gftp::download_from_url(&url, &output_file).await?; - info!("File downloaded.") - } - CmdLine::Upload { file, url } => { - info!( - "Uploading file [{}] to address [{}].", - &file.display(), - &url - ); + RpcMessage::file_response(id, output_file, url).print(verbose); + ExecMode::OneShot + } + RpcRequest::Receive { output_file } => { + let url = gftp::open_for_upload(&output_file).await?; + RpcMessage::file_response(id, output_file, url).print(verbose); + ExecMode::Service + } + RpcRequest::Upload { file, url } => { gftp::upload_file(&file, &url).await?; + RpcMessage::file_response(id, file, url).print(verbose); + ExecMode::OneShot } - CmdLine::AwaitUpload { filepath } => { - let url = gftp::open_for_upload(&filepath).await?; - info!( - "Waiting for file upload [{}] on url [{}].", - &filepath.display(), - &url - ); + RpcRequest::Shutdown {} => { + RpcMessage::response(id, RpcResult::Status(RpcStatusResult::Ok)).print(verbose); + ExecMode::Shutdown + } + }; + + Ok(exec_mode) +} + +async fn server_loop() { + let mut reader = io::BufReader::new(io::stdin()); + let mut buffer = String::new(); + let verbose = true; - actix_rt::signal::ctrl_c().await?; - info!("Received ctrl-c signal. Shutting down.") + loop { + let string = match reader.read_line(&mut buffer).await { + Ok(_) => mem::replace(&mut buffer, String::new()), + Err(_) => break, + }; + match serde_json::from_str::(&string) { + Ok(msg) => { + let id = msg.id.clone(); + if let Err(error) = msg.validate() { + RpcMessage::error(id.as_ref(), error).print(verbose); + continue; + } + match msg.body { + RpcBody::Request { request } => Arbiter::spawn(async move { + if let ExecMode::Shutdown = execute(id, request, verbose).await { + tokio::time::delay_for(Duration::from_secs(1)).await; + std::process::exit(0); + } + }), + _ => RpcMessage::request_error(id.as_ref()).print(verbose), + } + } + Err(err) => RpcMessage::error(None, err).print(verbose), } } +} + +#[actix_rt::main] +async fn main() -> Result<()> { + dotenv::dotenv().ok(); + + let mut builder = Builder::from_env(Env::new()); + builder.target(Target::Stderr); + builder.init(); + + let args = Args::from_args(); + match args.command { + Command::Command(request) => match execute(None, request, args.verbose).await { + ExecMode::Service => actix_rt::signal::ctrl_c().await?, + _ => log::debug!("Shutting down"), + }, + Command::Server => server_loop().await, + } + Ok(()) } diff --git a/core/gftp/src/gftp.rs b/core/gftp/src/gftp.rs index da2e90c4e3..6b0df99ab1 100644 --- a/core/gftp/src/gftp.rs +++ b/core/gftp/src/gftp.rs @@ -1,7 +1,6 @@ -use anyhow::{Context, Error, Result}; +use anyhow::{anyhow, Context, Error, Result}; use futures::lock::Mutex; use futures::prelude::*; -use log::{debug, info, warn}; use rand::distributions::Alphanumeric; use rand::Rng; use sha3::{Digest, Sha3_256}; @@ -75,7 +74,7 @@ impl FileDesc { chunk_size } as usize; - debug!("Reading chunk at offset: {}, size: {}", offset, chunk_size); + log::debug!("Reading chunk at offset: {}, size: {}", offset, chunk_size); let mut buffer = vec![0u8; bytes_to_read]; { let mut file = self.file.lock().await; @@ -106,6 +105,20 @@ pub async fn publish(path: &Path) -> Result { Ok(gftp_url(&filedesc.hash).await?) } +pub async fn close(url: &Url) -> Result { + let hash_name = match url.path_segments() { + Some(segments) => match segments.last() { + Some(segment) => segment, + _ => return Err(anyhow!("Invalid URL: {:?}", url)), + }, + _ => return Err(anyhow!("Invalid URL: {:?}", url)), + }; + + bus::unbind(model::file_bus_id(hash_name).as_str()) + .await + .map_err(|e| anyhow!(e)) +} + // =========================================== // // File download - client side ("provider") // =========================================== // @@ -117,14 +130,14 @@ pub async fn download_from_url(url: &Url, dst_path: &Path) -> Result<()> { pub async fn download_file(node_id: NodeId, hash: &str, dst_path: &Path) -> Result<()> { let remote = node_id.try_service(&model::file_bus_id(hash))?; - debug!("Creating target file {}", dst_path.display()); + log::debug!("Creating target file {}", dst_path.display()); let mut file = create_dest_file(dst_path)?; - info!("Loading file {} metadata.", dst_path.display()); + log::debug!("Loading file {} metadata.", dst_path.display()); let metadata = remote.send(model::GetMetadata {}).await??; - debug!("Metadata: file size {}.", metadata.file_size); + log::debug!("Metadata: file size {}.", metadata.file_size); let chunk_size = DEFAULT_CHUNK_SIZE; let num_chunks = (metadata.file_size + (chunk_size - 1)) / chunk_size; // Divide and round up. @@ -209,24 +222,25 @@ async fn upload_finished( msg: model::UploadFinished, ) -> Result<(), model::Error> { if let Some(expected_hash) = msg.hash { - info!("Upload finished. Verifying hash..."); + log::debug!("Upload finished. Verifying hash..."); let mut file = file.lock().await; let real_hash = hash_file_sha256(&mut file) .map_err(|error| model::Error::InternalError(error.to_string()))?; if expected_hash != real_hash { - warn!( + log::debug!( "Uploaded file hash {} is different than expected hash {}.", - &real_hash, &expected_hash + &real_hash, + &expected_hash ); //TODO: We should notify publisher about not matching hash. // Now we send error only for uploader. return Err(model::Error::IntegrityError); } - info!("File hash matches expected hash {}.", &expected_hash); + log::debug!("File hash matches expected hash {}.", &expected_hash); } else { - info!("Upload finished. Expected file hash not provided. Omitting validation."); + log::debug!("Upload finished. Expected file hash not provided. Omitting validation."); } //TODO: unsubscribe gsb events. @@ -241,7 +255,7 @@ pub async fn upload_file(path: &Path, url: &Url) -> Result<()> { let (node_id, random_filename) = extract_url(url)?; let remote = node_id.try_service(&model::file_bus_id(&random_filename))?; - debug!("Opening file to send {}.", path.display()); + log::debug!("Opening file to send {}.", path.display()); let chunk_size = DEFAULT_CHUNK_SIZE; @@ -257,14 +271,14 @@ pub async fn upload_file(path: &Path, url: &Url) -> Result<()> { .try_for_each(|_| future::ok(())) .await?; - debug!("Computing file hash."); + log::debug!("Computing file hash."); let hash = hash_file_sha256(&mut File::open(path)?)?; - info!("File [{}] has hash [{}].", path.display(), &hash); + log::debug!("File [{}] has hash [{}].", path.display(), &hash); remote .call(model::UploadFinished { hash: Some(hash) }) .await??; - info!("Upload finished correctly."); + log::debug!("Upload finished correctly."); Ok(()) } diff --git a/core/gftp/src/lib.rs b/core/gftp/src/lib.rs index 43e66b507a..dc8967f246 100644 --- a/core/gftp/src/lib.rs +++ b/core/gftp/src/lib.rs @@ -1,6 +1,7 @@ mod gftp; +pub mod rpc; pub use self::gftp::{ - download_file, download_from_url, extract_url, open_for_upload, publish, upload_file, + close, download_file, download_from_url, extract_url, open_for_upload, publish, upload_file, DEFAULT_CHUNK_SIZE, }; diff --git a/core/gftp/src/rpc.rs b/core/gftp/src/rpc.rs new file mode 100644 index 0000000000..afbccf377f --- /dev/null +++ b/core/gftp/src/rpc.rs @@ -0,0 +1,223 @@ +use serde::{Deserialize, Serialize}; +use std::io::Write; +use std::path::PathBuf; +use structopt::StructOpt; +use url::Url; + +const JSON_RPC_VERSION: &str = "2.0"; + +#[allow(unused)] +#[derive(Debug)] +#[repr(i32)] +pub enum JsonRpcError { + /// Invalid JSON was received by the server. + ParseError = -32700, + /// The JSON sent is not a valid Request object. + InvalidRequest = -32600, + /// The method does not exist / is not available. + MethodNotFound = -32601, + /// Invalid method parameter(s). + InvalidParams = -32602, + /// Internal JSON-RPC error. + InternalError = -32603, + /// Server error + ServerError = -32000, +} + +impl ToString for JsonRpcError { + fn to_string(&self) -> String { + format!("{:?}", self) + } +} + +impl From for JsonRpcError { + fn from(e: serde_json::Error) -> Self { + use serde_json::error::Category; + match e.classify() { + Category::Data => JsonRpcError::InvalidRequest, + Category::Eof | Category::Io | Category::Syntax => JsonRpcError::ParseError, + } + } +} + +impl From for JsonRpcError { + fn from(_: std::io::Error) -> Self { + JsonRpcError::ServerError + } +} + +impl From for JsonRpcError { + fn from(_: anyhow::Error) -> Self { + JsonRpcError::ServerError + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct RpcMessage { + pub jsonrpc: String, + pub id: Option, + #[serde(flatten)] + pub body: RpcBody, +} + +impl RpcMessage { + pub fn request(id: Option<&RpcId>, request: RpcRequest) -> Self { + RpcMessage { + jsonrpc: JSON_RPC_VERSION.to_string(), + id: id.cloned(), + body: RpcBody::Request { request }, + } + } + + pub fn response(id: Option<&RpcId>, result: RpcResult) -> Self { + RpcMessage { + jsonrpc: JSON_RPC_VERSION.to_string(), + id: id.cloned(), + body: RpcBody::Result { result }, + } + } + + pub fn file_response(id: Option<&RpcId>, file: PathBuf, url: Url) -> Self { + Self::response(id, RpcResult::File(RpcFileResult { file, url })) + } + + pub fn files_response(id: Option<&RpcId>, items: Vec<(PathBuf, Url)>) -> Self { + let items = items + .into_iter() + .map(|(file, url)| RpcFileResult { file, url }) + .collect(); + Self::response(id, RpcResult::Files(items)) + } + + pub fn error + ToString>(id: Option<&RpcId>, err: E) -> Self { + let message = err.to_string(); + RpcMessage { + jsonrpc: JSON_RPC_VERSION.to_string(), + id: id.cloned(), + body: RpcBody::Error { + error: RpcError { + message, + code: err.into() as i32, + }, + }, + } + } + + pub fn request_error(id: Option<&RpcId>) -> Self { + Self::error(id, JsonRpcError::InvalidRequest) + } + + pub fn validate(&self) -> Result<(), JsonRpcError> { + if self.jsonrpc.as_str() != JSON_RPC_VERSION { + return Err(JsonRpcError::InvalidRequest); + } + Ok(()) + } + + pub fn print(&self, verbose: bool) { + let mut stdout = std::io::stdout(); + let json = match verbose { + true => serde_json::to_string(self).unwrap(), + false => serde_json::to_string(&self.body).unwrap(), + }; + let _ = stdout.write_fmt(format_args!("{}\r\n", json)); + let _ = stdout.flush(); + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum RpcId { + Int(i64), + Float(f64), + String(String), +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +#[serde(rename_all = "snake_case")] +pub enum RpcBody { + Request { + #[serde(flatten)] + request: RpcRequest, + }, + Result { + result: RpcResult, + }, + Error { + error: RpcError, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[serde(tag = "method", content = "params")] +#[serde(rename_all = "snake_case")] +pub enum RpcRequest { + /// Prints out version + Version {}, + /// Publishes files (blocking) + Publish { files: Vec }, + /// Stops publishing a file + Close { urls: Vec }, + /// Downloads a file + Download { + /// Source URL + url: Url, + /// Destination path + output_file: PathBuf, + }, + /// Waits for file upload (blocking) + Receive { + /// Destination path + output_file: PathBuf, + }, + /// Uploads a file + Upload { + /// Destination URL + url: Url, + /// Source path + file: PathBuf, + }, + /// Shuts down the server + Shutdown {}, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum RpcResult { + String(String), + File(RpcFileResult), + Files(Vec), + Status(RpcStatusResult), + Statuses(Vec), +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] +#[serde(rename_all = "snake_case")] +pub enum RpcStatusResult { + Ok, + Error, +} + +impl From for RpcStatusResult { + fn from(b: bool) -> Self { + match b { + true => RpcStatusResult::Ok, + false => RpcStatusResult::Error, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case")] +pub struct RpcFileResult { + pub file: PathBuf, + pub url: Url, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case")] +pub struct RpcError { + pub code: i32, + pub message: String, +} diff --git a/service-bus/bus/src/local_router.rs b/service-bus/bus/src/local_router.rs index 3877afca15..a14523cb25 100644 --- a/service-bus/bus/src/local_router.rs +++ b/service-bus/bus/src/local_router.rs @@ -332,10 +332,37 @@ impl Router { let addr = format!("{}/{}", addr, T::ID); log::debug!("binding {}", addr); let _ = self.handlers.insert(addr.clone(), slot); - RemoteRouter::from_registry().do_send(UpdateService::Add(addr.into())); + RemoteRouter::from_registry().do_send(UpdateService::Add(addr)); Handle { _inner: () } } + pub fn unbind(&mut self, addr: &str) -> impl Future> + Unpin { + let pattern = match addr.ends_with('/') { + true => addr.to_string(), + false => format!("{}/", addr), + }; + let addrs = self + .handlers + .keys() + .filter(|a| a.starts_with(&pattern)) + .cloned() + .collect::>(); + + addrs.iter().for_each(|addr| { + log::debug!("unbinding {}", addr); + self.handlers.remove(&addr); + }); + + Box::pin(async move { + let router = RemoteRouter::from_registry(); + let success = !addrs.is_empty(); + for addr in addrs { + router.send(UpdateService::Remove(addr)).await?; + } + Ok(success) + }) + } + pub fn bind_stream( &mut self, addr: &str, diff --git a/service-bus/bus/src/remote_router.rs b/service-bus/bus/src/remote_router.rs index 8668de888a..013931e471 100644 --- a/service-bus/bus/src/remote_router.rs +++ b/service-bus/bus/src/remote_router.rs @@ -124,7 +124,6 @@ impl SystemService for RemoteRouter {} pub enum UpdateService { Add(String), - #[allow(dead_code)] Remove(String), } @@ -149,6 +148,12 @@ impl Handler for RemoteRouter { self.local_bindings.insert(service_id); } UpdateService::Remove(service_id) => { + if let Some(c) = &mut self.connection { + Arbiter::spawn(c.unbind(service_id.clone()).then(|v| async { + v.unwrap_or_else(|e| log::error!("unbind error: {}", e)) + })) + } + log::trace!("Unbinding local service '{}'", service_id); self.local_bindings.remove(&service_id); } } diff --git a/service-bus/bus/src/typed.rs b/service-bus/bus/src/typed.rs index 86bb4f3481..da52ed7a19 100644 --- a/service-bus/bus/src/typed.rs +++ b/service-bus/bus/src/typed.rs @@ -40,6 +40,11 @@ pub fn bind(addr: &str, endpoint: impl RpcHandler + Unpin + 's router().lock().unwrap().bind(addr, endpoint) } +#[inline] +pub async fn unbind(addr: &str) -> Result { + router().lock().unwrap().unbind(addr).await +} + pub fn bind_stream( addr: &str, endpoint: impl RpcStreamHandler + Unpin + 'static,