diff --git a/cmd/flatend/main.go b/cmd/flatend/main.go index 4ff88e8..8d1b429 100644 --- a/cmd/flatend/main.go +++ b/cmd/flatend/main.go @@ -8,6 +8,7 @@ import ( "github.com/julienschmidt/httprouter" "github.com/lithdew/flatend" "github.com/lithdew/kademlia" + "github.com/spf13/pflag" "io/ioutil" "net" "net/http" @@ -160,7 +161,16 @@ var Methods = map[string]struct{}{ } func main() { - buf, err := ioutil.ReadFile("config.toml") + var configPath string + var bindHost net.IP + var bindPort uint16 + + pflag.StringVarP(&configPath, "config", "c", "config.toml", "path to config file") + pflag.IPVarP(&bindHost, "host", "h", net.IPv4(127, 0, 0, 1), "bind host") + pflag.Uint16VarP(&bindPort, "port", "p", 9000, "bind port") + pflag.Parse() + + buf, err := ioutil.ReadFile(configPath) check(err) var cfg Config @@ -170,7 +180,7 @@ func main() { _, priv, err := kademlia.GenerateKeys(nil) check(err) - addr := "127.0.0.1:9000" + addr := flatend.Addr(bindHost, bindPort) node, err := flatend.NewNode(priv, addr) check(err) @@ -178,6 +188,8 @@ func main() { ln, err := net.Listen("tcp", addr) check(err) + fmt.Printf("Listening for microservices on %s.\n", ln.Addr()) + go func() { check(node.Serve(ln)) }() diff --git a/go.mod b/go.mod index eaafd80..1a9e7a8 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/lithdew/bytesutil v0.0.0-20200409052507-d98389230a59 github.com/lithdew/kademlia v0.0.0-20200607181215-ff07ba2ac940 github.com/lithdew/monte v0.0.0-20200613034704-f6d482652dc3 + github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.6.0 // indirect golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 // indirect golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375 // indirect diff --git a/go.sum b/go.sum index bd638e8..a4e950c 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/oasislabs/ed25519 v0.0.0-20200302143042-29f6767a7c3e h1:85L+lUTJHx4O7 github.com/oasislabs/ed25519 v0.0.0-20200302143042-29f6767a7c3e/go.mod h1:xIpCyrK2ouGA4QBGbiNbkoONrvJ00u9P3QOkXSOAC0c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= diff --git a/node.go b/node.go index 36e14e7..fab3d47 100644 --- a/node.go +++ b/node.go @@ -185,7 +185,7 @@ func (n *Node) handleHandshakePacket(ctx *monte.Context, body []byte) error { ID: n.id, Services: n.Services(), } - pkt.Signature = n.sec.Sign(pkt.AppendPayloadTo(nil)) + pkt.Signature = n.sec.Sign(pkt.AppendPayloadTo(body[:0])) return ctx.Reply(pkt.AppendTo(body[:0])) } diff --git a/ts/flatend.js b/ts/flatend.js new file mode 100644 index 0000000..6f878f9 --- /dev/null +++ b/ts/flatend.js @@ -0,0 +1,122 @@ +import {EventEmitter} from "events"; +import {HandshakePacket, RequestPacket, ResponsePacket} from "./packet.js"; +import MonteSocket from "./monte.js"; +import ip from "ip"; + +const OPCODE_HANDSHAKE = 0; +const OPCODE_REQUEST = 1; + +class Flatend { + /** + * + * @param {ID} [id] + * @param {nacl.SignKeyPair} [keys] + */ + constructor({id, keys}) { + this.listeners = new EventEmitter(); + this.self = id; + this.keys = keys; + } + + /** + * + * @param {string} service + * @param handler + */ + register(service, handler) { + if (this.listeners.listenerCount(service) > 0) + throw new Error(`Service '${service}' is already registered.`) + + this.listeners.on(service, ({seq, data}) => { + let res; + try { + res = handler(data); + } catch (err) { + res = {error: err.toString()} + } + + if (!Buffer.isBuffer(res) && typeof res !== "string") { + res = JSON.stringify(res); + } + + this.conn.reply(seq, new ResponsePacket(res && Buffer.from(res)).encode()); + }); + } + + async start(opts) { + this.conn = await MonteSocket.dial(opts); + + await this.handshake(); + + this.conn.on('data', this._data.bind(this)); + this.conn.on('error', console.error); + + this.conn.once('close', () => { + const reconnect = async () => { + console.log(`Trying to reconnect to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}. Sleeping for 1s.`); + + try { + await this.start({port: 9000, host: "127.0.0.1"}); + + console.log(`Successfully connected to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}.`); + } catch (err) { + this.conn.destroy(err); + + setTimeout(reconnect, 1000); + } + }; + + setTimeout(reconnect, 1000); + }); + } + + _data({seq, frame}) { + const opcode = frame.readUInt8(); + frame = frame.slice(1); + + switch (opcode) { + case OPCODE_REQUEST: + const packet = RequestPacket.decode(frame); + + const service = packet.services.find(service => this.listeners.listenerCount(service) > 0); + if (!service) { + this.conn.reply(seq, new ResponsePacket().encode()); + return; + } + + this.listeners.emit(service, {seq, data: packet.data}); + // emit out, catch, grab result from return statement and + break; + default: + throw new Error(`Unknown opcode ${opcode}.`); + } + } + + async handshake() { + let packet = new HandshakePacket( + {id: this.self, services: [...this.listeners.eventNames()]} + ).sign(this.keys.secretKey); + + packet = HandshakePacket.decode(await this.request(OPCODE_HANDSHAKE, packet.encode())); + + this.id = packet.id; + this.services = packet.services; + } + + _prepare(opcode, data) { + const header = Buffer.alloc(1); + header.writeUInt8(opcode); + + return Buffer.concat([header, data]); + } + + async request(opcode, data, timeout = 3000) { + return await this.conn.request(this._prepare(opcode, data), timeout) + } + + send(opcode, data) { + return this.conn.send(this._prepare(opcode, data)); + } +} + +export {Flatend}; \ No newline at end of file diff --git a/ts/main.js b/ts/main.js index e27239b..6af3045 100644 --- a/ts/main.js +++ b/ts/main.js @@ -1,126 +1,8 @@ import nacl from "tweetnacl"; -import {EventEmitter} from "events"; -import MonteSocket from "./monte.js"; -import {HandshakePacket, ID, RequestPacket, ResponsePacket} from "./packet.js"; +import {Flatend} from "./flatend.js"; +import {ID} from "./packet.js"; import ip from "ip"; -const OPCODE_HANDSHAKE = 0; -const OPCODE_REQUEST = 1; - - -class Flatend { - /** - * - * @param {ID} [id] - * @param {nacl.SignKeyPair} [keys] - */ - constructor({id, keys}) { - this.listeners = new EventEmitter(); - this.self = id; - this.keys = keys; - } - - /** - * - * @param {string} service - * @param handler - */ - register(service, handler) { - if (this.listeners.listenerCount(service) > 0) - throw new Error(`Service '${service}' is already registered.`) - - this.listeners.on(service, ({seq, data}) => { - let res; - try { - res = handler(data); - } catch (err) { - res = {error: err.toString()} - } - - if (!Buffer.isBuffer(res) && typeof res !== "string") { - res = JSON.stringify(res); - } - - this.conn.reply(seq, new ResponsePacket(res && Buffer.from(res)).encode()); - }); - } - - async start(opts) { - this.conn = await MonteSocket.dial(opts); - - await this.handshake(); - - this.conn.on('data', this._data.bind(this)); - this.conn.on('error', console.error); - - this.conn.once('close', () => { - const reconnect = async () => { - console.log(`Trying to reconnect to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}. Sleeping for 1s.`); - - try { - await this.start({port: 9000, host: "127.0.0.1"}); - - console.log(`Successfully connected to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}.`); - } catch (err) { - this.conn.destroy(err); - - setTimeout(reconnect, 1000); - } - }; - - setTimeout(reconnect, 1000); - }); - } - - _data({seq, frame}) { - const opcode = frame.readUInt8(); - frame = frame.slice(1); - - switch (opcode) { - case OPCODE_REQUEST: - const packet = RequestPacket.decode(frame); - - const service = packet.services.find(service => this.listeners.listenerCount(service) > 0); - if (!service) { - this.conn.reply(seq, new ResponsePacket().encode()); - return; - } - - this.listeners.emit(service, {seq, data: packet.data}); - // emit out, catch, grab result from return statement and - break; - default: - throw new Error(`Unknown opcode ${opcode}.`); - } - } - - async handshake() { - let packet = new HandshakePacket( - {id: this.self, services: [...this.listeners.eventNames()]} - ).sign(this.keys.secretKey); - - packet = HandshakePacket.decode(await this.request(OPCODE_HANDSHAKE, packet.encode())); - - this.id = packet.id; - this.services = packet.services; - } - - _prepare(opcode, data) { - const header = Buffer.alloc(1); - header.writeUInt8(opcode); - - return Buffer.concat([header, data]); - } - - async request(opcode, data, timeout = 3000) { - return await this.conn.request(this._prepare(opcode, data), timeout) - } - - send(opcode, data) { - return this.conn.send(this._prepare(opcode, data)); - } -} - async function main() { const keys = nacl.sign.keyPair(); @@ -137,7 +19,7 @@ async function main() { return data; }); - backend.register("all_todos", data => {}) + backend.register("all_todos", () => "hello world!"); await backend.start({port: 9000, host: "127.0.0.1"});