diff --git a/go.mod b/go.mod index 59b92cd..27375bb 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,13 @@ replace github.com/lithdew/kademlia => ../kademlia require ( github.com/BurntSushi/toml v0.3.1 + github.com/davecgh/go-spew v1.1.1 github.com/jpillora/backoff v1.0.0 github.com/json-iterator/go v1.1.10 github.com/julienschmidt/httprouter v1.3.0 github.com/lithdew/bytesutil v0.0.0-20200409052507-d98389230a59 github.com/lithdew/kademlia v0.0.0-20200607181215-ff07ba2ac940 - github.com/lithdew/monte v0.0.0-20200612062106-02975bf1abd2 + github.com/lithdew/monte v0.0.0-20200612163155-688f7a476468 github.com/stretchr/testify v1.6.0 // indirect golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 // indirect golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375 // indirect diff --git a/go.sum b/go.sum index 31cf06f..6c38e05 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,10 @@ github.com/lithdew/monte v0.0.0-20200611093340-15ff088304c9 h1:XmfrU1ZyBDuiZ2/UU github.com/lithdew/monte v0.0.0-20200611093340-15ff088304c9/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc= github.com/lithdew/monte v0.0.0-20200612062106-02975bf1abd2 h1:m1MUK1qrH7bJyKiY5x4n02kxrwZuNt+6z2UNUniD1FA= github.com/lithdew/monte v0.0.0-20200612062106-02975bf1abd2/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc= +github.com/lithdew/monte v0.0.0-20200612162118-d9ef724a278f h1:ePrvQ/QMnHVDjWfan0twg6lD1WYy03Un8kxH7dAl86k= +github.com/lithdew/monte v0.0.0-20200612162118-d9ef724a278f/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc= +github.com/lithdew/monte v0.0.0-20200612163155-688f7a476468 h1:uVJAVZL+BulMCLaRtjVwp4jH2d6IIls+QjkIZdddCvY= +github.com/lithdew/monte v0.0.0-20200612163155-688f7a476468/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc= github.com/lithdew/reliable v0.0.0-20200506103725-7df64908b057 h1:CBhKVPym/7ZzY7ascOG93XSTlJuqrmIU/Hd0UDHC1TA= github.com/lithdew/reliable v0.0.0-20200506103725-7df64908b057/go.mod h1:b9iSDHZ4DaCGpwhQdKsH0u61UancBXJMe0r8SCPKEEA= github.com/lithdew/seq v0.0.0-20200504083424-74d5d8117a05 h1:j1UtG8NYCupA5xUwQ/vrTf/zjuNlZ0D1n7UtM8LhS58= diff --git a/net.go b/net.go index 6d7bc08..9a4c7a0 100644 --- a/net.go +++ b/net.go @@ -49,6 +49,11 @@ const ( OpcodeRequest ) +type PayloadPacket struct { + Headers map[string]string + Body []byte +} + type HandshakePacket struct { ID kademlia.ID Services []string @@ -138,6 +143,39 @@ func (h HandshakePacket) Validate(dst []byte) error { return nil } +type ResponsePacket []byte + +func (r ResponsePacket) AppendTo(dst []byte) []byte { + if r == nil { + dst = append(dst, 0) + } else { + dst = append(dst, 1) + dst = bytesutil.AppendUint32BE(dst, uint32(len(r))) + dst = append(dst, r...) + } + return dst +} + +func UnmarshalResponsePacket(buf []byte) (ResponsePacket, error) { + if len(buf) < 1 || (buf[0] != 0 && buf[0] != 1) { + return nil, io.ErrUnexpectedEOF + } + handled := buf[0] == 1 + buf = buf[1:] + if !handled { + return nil, nil + } + if len(buf) < 4 { + return nil, io.ErrUnexpectedEOF + } + size := bytesutil.Uint32BE(buf[:4]) + buf = buf[4:] + if uint32(len(buf)) < size { + return nil, io.ErrUnexpectedEOF + } + return buf, nil +} + type RequestPacket struct { Services []string Data []byte diff --git a/node.go b/node.go index 6407e8d..36e14e7 100644 --- a/node.go +++ b/node.go @@ -122,8 +122,14 @@ func (n *Node) Process(services []string, data []byte) ([]byte, error) { for _, provider := range providers { dst, err = provider.conn.Request(dst[:0], req) if err == nil { + dst, err = UnmarshalResponsePacket(dst) + } + if err == nil && dst != nil { return dst, nil } + if err != nil { + fmt.Println(err) + } } return nil, fmt.Errorf("no nodes were able to process your request for service(s): %s", services) diff --git a/peer.go b/peer.go index 3b89d20..b3fcf50 100644 --- a/peer.go +++ b/peer.go @@ -57,11 +57,7 @@ func (p *Peer) HandleMessage(ctx *monte.Context) error { if handler == nil { continue } - res := handler(ftx) - if res != nil { - return ctx.Reply(res) - } - return nil + return ctx.Reply(ResponsePacket(handler(ftx)).AppendTo(nil)) } return nil diff --git a/ts/main.js b/ts/main.js index 64e85d9..74a1325 100644 --- a/ts/main.js +++ b/ts/main.js @@ -1,18 +1,18 @@ import nacl from "tweetnacl"; import {EventEmitter} from "events"; import MonteSocket from "./monte.js"; -import {HandshakePacket, ID, RequestPacket} from "./packet.js"; +import {HandshakePacket, ID, RequestPacket, ResponsePacket} from "./packet.js"; import ip from "ip"; const OPCODE_HANDSHAKE = 0; const OPCODE_REQUEST = 1; -class FlatendClient { +class Flatend { /** * - * @param {ID} id - * @param {nacl.SignKeyPair} keys + * @param {ID} [id] + * @param {nacl.SignKeyPair} [keys] */ constructor({id, keys}) { this.listeners = new EventEmitter(); @@ -30,7 +30,18 @@ class FlatendClient { throw new Error(`Service '${service}' is already registered.`) this.listeners.on(service, ({seq, data}) => { - this.conn.reply(seq, handler(data)); + let res; + try { + res = handler(data); + } catch (err) { + res = {error: err.toString()} + } + + if (!Buffer.isBuffer(res) && typeof res !== "string") { + res = JSON.stringify(res); + } + + this.conn.reply(seq, new ResponsePacket(Buffer.from(res)).encode()); }); } @@ -41,6 +52,22 @@ class FlatendClient { this.conn.on('data', this._data.bind(this)); this.conn.on('error', console.error); + + this.conn.on('close', () => { + const reconnect = async () => { + console.log(`Trying to reconnect to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}. Sleeping for 1s.`); + + try { + await this.start({port: 9000, host: "127.0.0.1"}); + + console.log(`Successfully connected to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}.`); + } catch (err) { + setTimeout(reconnect, 1000); + } + }; + + setTimeout(reconnect, 1000); + }); } _data({seq, frame}) { @@ -53,7 +80,7 @@ class FlatendClient { const service = packet.services.find(service => this.listeners.listenerCount(service) > 0); if (!service) { - // reply with an error + this.conn.reply(seq, new ResponsePacket().encode()); return; } @@ -97,28 +124,23 @@ async function main() { const keys = nacl.sign.keyPair(); const id = new ID({publicKey: keys.publicKey, host: "127.0.0.1", port: 12000}); - const client = new FlatendClient({id, keys}); - client.register("get_todos", data => data); + const backend = new Flatend({id, keys}); - await client.start({port: 9000, host: "127.0.0.1"}); + backend.register("get_todos", data => { + data = JSON.parse(data); - client.conn.on('close', () => { - const reconnect = async () => { - console.log(`Trying to reconnect to ${ip.toString(client.id.host, 12, 4)}:${client.id.port}. Sleeping for 1s.`); + if (parseInt(data?.params?.id) !== 123) + throw new Error(`ID must be 123. Got ${data?.params?.id}.`); - try { - await client.start({port: 9000, host: "127.0.0.1"}); + return data; + }); - console.log(`Successfully connected to ${ip.toString(client.id.host, 12, 4)}:${client.id.port}.`); - } catch (err) { - setTimeout(reconnect, 1000); - } - }; + backend.register("all_todos", data => "hello world!") + + await backend.start({port: 9000, host: "127.0.0.1"}); - setTimeout(reconnect, 1000); - }) - console.log(`Successfully connected to ${ip.toString(client.id.host, 12, 4)}:${client.id.port}.`); + console.log(`Successfully connected to ${ip.toString(backend.id.host, 12, 4)}:${backend.id.port}.`); } -main().catch(err => console.error(err)); \ No newline at end of file +main().catch(err => console.error(err)); diff --git a/ts/monte.js b/ts/monte.js index 4bfc734..aace08b 100644 --- a/ts/monte.js +++ b/ts/monte.js @@ -1,10 +1,12 @@ -import {Duplex} from "stream"; +import stream from "readable-stream"; import {EventEmitter} from "events"; import net from "net"; import nacl from "tweetnacl"; import blake2b from "blake2b"; import crypto from "crypto"; +const {Duplex} = stream; + /** * * @param {Socket} sock @@ -53,6 +55,8 @@ class MonteSocket extends Duplex { this.pending = new EventEmitter(); this.counter = 0; + this._paused = false; + this.conn.on('close', had_error => this.emit('close', had_error)); this.conn.on('connect', () => this.emit('connect')); this.conn.on('drain', () => this.emit('drain')); @@ -185,12 +189,13 @@ class MonteSocket extends Duplex { this.conn.write(Buffer.concat([header, chunk]), callback); } - _read(_size) { + _read() { + this._paused = false; setImmediate(this._readable.bind(this)); } _readable() { - while (true) { + while (!this._paused) { const header = this.conn.read(4); if (!header) return; @@ -198,6 +203,7 @@ class MonteSocket extends Duplex { let frame = this.conn.read(length); if (!frame) { + console.log(`Trying to read ${length} byte(s), only ${this.conn.readableLength} byte(s) available.`) this.conn.unshift(header); return; } @@ -213,7 +219,9 @@ class MonteSocket extends Duplex { if (seq === 0 || this.pending.listenerCount(`${seq}`) === 0) { try { - this.push({seq, frame}); + if (!this.push({seq, frame})) { + this._paused = true; + } } catch (err) { this.emit("error", err); } @@ -222,6 +230,10 @@ class MonteSocket extends Duplex { } } } + + _final(cb) { + this.conn.end(cb); + } } export default MonteSocket; \ No newline at end of file diff --git a/ts/package-lock.json b/ts/package-lock.json deleted file mode 100644 index 3958f68..0000000 --- a/ts/package-lock.json +++ /dev/null @@ -1,106 +0,0 @@ -{ - "name": "flatend", - "version": "1.0.0", - "lockfileVersion": 1, - "requires": true, - "dependencies": { - "@types/node": { - "version": "14.0.13", - "resolved": "https://registry.npmjs.org/@types/node/-/node-14.0.13.tgz", - "integrity": "sha512-rouEWBImiRaSJsVA+ITTFM6ZxibuAlTuNOCyxVbwreu6k6+ujs7DfnU9o+PShFhET78pMBl3eH+AGSI5eOTkPA==", - "dev": true - }, - "arg": { - "version": "4.1.3", - "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", - "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", - "dev": true - }, - "blake2b": { - "version": "2.1.3", - "resolved": "https://registry.npmjs.org/blake2b/-/blake2b-2.1.3.tgz", - "integrity": "sha512-pkDss4xFVbMb4270aCyGD3qLv92314Et+FsKzilCLxDz5DuZ2/1g3w4nmBbu6nKApPspnjG7JcwTjGZnduB1yg==", - "requires": { - "blake2b-wasm": "^1.1.0", - "nanoassert": "^1.0.0" - } - }, - "blake2b-wasm": { - "version": "1.1.7", - "resolved": "https://registry.npmjs.org/blake2b-wasm/-/blake2b-wasm-1.1.7.tgz", - "integrity": "sha512-oFIHvXhlz/DUgF0kq5B1CqxIDjIJwh9iDeUUGQUcvgiGz7Wdw03McEO7CfLBy7QKGdsydcMCgO9jFNBAFCtFcA==", - "requires": { - "nanoassert": "^1.0.0" - } - }, - "buffer-from": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.1.tgz", - "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==", - "dev": true - }, - "diff": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", - "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", - "dev": true - }, - "make-error": { - "version": "1.3.6", - "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", - "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", - "dev": true - }, - "nanoassert": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/nanoassert/-/nanoassert-1.1.0.tgz", - "integrity": "sha1-TzFS4JVA/eKMdvRLGbvNHVpCR40=" - }, - "source-map": { - "version": "0.6.1", - "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", - "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==", - "dev": true - }, - "source-map-support": { - "version": "0.5.19", - "resolved": "https://registry.npmjs.org/source-map-support/-/source-map-support-0.5.19.tgz", - "integrity": "sha512-Wonm7zOCIJzBGQdB+thsPar0kYuCIzYvxZwlBa87yi/Mdjv7Tip2cyVbLj5o0cFPN4EVkuTwb3GDDyUx2DGnGw==", - "dev": true, - "requires": { - "buffer-from": "^1.0.0", - "source-map": "^0.6.0" - } - }, - "ts-node": { - "version": "8.10.2", - "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-8.10.2.tgz", - "integrity": "sha512-ISJJGgkIpDdBhWVu3jufsWpK3Rzo7bdiIXJjQc0ynKxVOVcg2oIrf2H2cejminGrptVc6q6/uynAHNCuWGbpVA==", - "dev": true, - "requires": { - "arg": "^4.1.0", - "diff": "^4.0.1", - "make-error": "^1.1.1", - "source-map-support": "^0.5.17", - "yn": "3.1.1" - } - }, - "tweetnacl": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.3.tgz", - "integrity": "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==" - }, - "typescript": { - "version": "3.9.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.9.5.tgz", - "integrity": "sha512-hSAifV3k+i6lEoCJ2k6R2Z/rp/H3+8sdmcn5NrS3/3kE7+RyZXm9aqvxWqjEXHAd8b0pShatpcdMTvEdvAJltQ==", - "dev": true - }, - "yn": { - "version": "3.1.1", - "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", - "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", - "dev": true - } - } -} diff --git a/ts/package.json b/ts/package.json index 2efb881..d3b7128 100644 --- a/ts/package.json +++ b/ts/package.json @@ -9,6 +9,7 @@ "dependencies": { "blake2b": "^2.1.3", "ip": "^1.1.5", + "readable-stream": "^3.6.0", "tweetnacl": "^1.0.3" } } diff --git a/ts/packet.js b/ts/packet.js index eef89ba..90363b1 100644 --- a/ts/packet.js +++ b/ts/packet.js @@ -14,6 +14,31 @@ class ID { this.port = port; } + /** + * + * @return {Buffer} + */ + encode() { + let host; + if (typeof this.host === "string") { + if (ip.isV4Format(this.host)) { + host = Buffer.concat([Buffer.of(0), ip.toBuffer(this.host)]); + } else if (ip.isV6Format(this.host)) { + host = Buffer.concat([Buffer.of(1), ip.toBuffer(this.host)]); + } else { + host = Buffer.of(2); + } + } else if (Buffer.isBuffer(this.host)) { + const type = this.host.byteLength === 4 ? 0 : this.host.byteLength === 16 ? 1 : 2; + host = Buffer.concat([Buffer.of(type), this.host]); + } + + const port = Buffer.alloc(2); + port.writeUInt16BE(this.port); + + return Buffer.concat([this.publicKey, host, port]); + } + /** * * @param {Buffer} buf @@ -46,31 +71,6 @@ class ID { return [new ID({publicKey, host, port}), buf]; } - - /** - * - * @return {Buffer} - */ - encode() { - let host; - if (typeof this.host === "string") { - if (ip.isV4Format(this.host)) { - host = Buffer.concat([Buffer.of(0), ip.toBuffer(this.host)]); - } else if (ip.isV6Format(this.host)) { - host = Buffer.concat([Buffer.of(1), ip.toBuffer(this.host)]); - } else { - host = Buffer.of(2); - } - } else if (Buffer.isBuffer(this.host)) { - const type = this.host.byteLength === 4 ? 0 : this.host.byteLength === 16 ? 1 : 2; - host = Buffer.concat([Buffer.of(type), this.host]); - } - - const port = Buffer.alloc(2); - port.writeUInt16BE(this.port); - - return Buffer.concat([this.publicKey, host, port]); - } } @@ -108,6 +108,11 @@ class HandshakePacket { ]); } + /** + * + * @param {Buffer} buf + * @return {HandshakePacket} + */ static decode(buf) { let decoded = ID.decode(buf); @@ -165,6 +170,11 @@ class RequestPacket { return Buffer.concat([services, data, this.data]) } + /** + * + * @param {Buffer} buf + * @return {RequestPacket} + */ static decode(buf) { let size = buf.readUInt8(); buf = buf.slice(1); @@ -188,4 +198,38 @@ class RequestPacket { } } -export {ID, HandshakePacket, RequestPacket}; \ No newline at end of file +class ResponsePacket { + constructor( + data = null + ) { + this.data = data; + } + + encode() { + if (this.data) { + const header = Buffer.alloc(5); + header.writeUInt8(1); + header.writeUInt32BE(this.data.byteLength, 1); + return Buffer.concat([header, this.data]); + } + return Buffer.of(0); + } + + /** + * + * @param {Buffer} buf + */ + static decode(buf) { + const handled = buf.readUInt8() === 1; + buf = buf.slice(1); + + if (!handled) return new ResponsePacket(); + + const size = buf.readUInt32BE(); + buf = buf.slice(4); + + return new ResponsePacket(buf.slice(0, size)); + } +} + +export {ID, HandshakePacket, RequestPacket, ResponsePacket}; \ No newline at end of file diff --git a/ts/yarn.lock b/ts/yarn.lock index 731aa7b..9a96ebf 100644 --- a/ts/yarn.lock +++ b/ts/yarn.lock @@ -17,6 +17,11 @@ blake2b@^2.1.3: blake2b-wasm "^1.1.0" nanoassert "^1.0.0" +inherits@^2.0.3: + version "2.0.4" + resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c" + integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== + ip@^1.1.5: version "1.1.5" resolved "https://registry.yarnpkg.com/ip/-/ip-1.1.5.tgz#bdded70114290828c0a039e72ef25f5aaec4354a" @@ -27,7 +32,33 @@ nanoassert@^1.0.0: resolved "https://registry.yarnpkg.com/nanoassert/-/nanoassert-1.1.0.tgz#4f3152e09540fde28c76f44b19bbcd1d5a42478d" integrity sha1-TzFS4JVA/eKMdvRLGbvNHVpCR40= +readable-stream@^3.6.0: + version "3.6.0" + resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198" + integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA== + dependencies: + inherits "^2.0.3" + string_decoder "^1.1.1" + util-deprecate "^1.0.1" + +safe-buffer@~5.2.0: + version "5.2.1" + resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6" + integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== + +string_decoder@^1.1.1: + version "1.3.0" + resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.3.0.tgz#42f114594a46cf1a8e30b0a84f56c78c3edac21e" + integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== + dependencies: + safe-buffer "~5.2.0" + tweetnacl@^1.0.3: version "1.0.3" resolved "https://registry.yarnpkg.com/tweetnacl/-/tweetnacl-1.0.3.tgz#ac0af71680458d8a6378d0d0d050ab1407d35596" integrity sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw== + +util-deprecate@^1.0.1: + version "1.0.2" + resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf" + integrity sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8=