Skip to content

Commit

Permalink
sdk/js, cmd/flatend, node: add commandline flags to flatend, move fla…
Browse files Browse the repository at this point in the history
…tend lib to separate file in js sdk
  • Loading branch information
lithdew committed Jun 13, 2020
1 parent cfc275a commit 671217b
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 124 deletions.
16 changes: 14 additions & 2 deletions cmd/flatend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/julienschmidt/httprouter"
"github.com/lithdew/flatend"
"github.com/lithdew/kademlia"
"github.com/spf13/pflag"
"io/ioutil"
"net"
"net/http"
Expand Down Expand Up @@ -160,7 +161,16 @@ var Methods = map[string]struct{}{
}

func main() {
buf, err := ioutil.ReadFile("config.toml")
var configPath string
var bindHost net.IP
var bindPort uint16

pflag.StringVarP(&configPath, "config", "c", "config.toml", "path to config file")
pflag.IPVarP(&bindHost, "host", "h", net.IPv4(127, 0, 0, 1), "bind host")
pflag.Uint16VarP(&bindPort, "port", "p", 9000, "bind port")
pflag.Parse()

buf, err := ioutil.ReadFile(configPath)
check(err)

var cfg Config
Expand All @@ -170,14 +180,16 @@ func main() {
_, priv, err := kademlia.GenerateKeys(nil)
check(err)

addr := "127.0.0.1:9000"
addr := flatend.Addr(bindHost, bindPort)

node, err := flatend.NewNode(priv, addr)
check(err)

ln, err := net.Listen("tcp", addr)
check(err)

fmt.Printf("Listening for microservices on %s.\n", ln.Addr())

go func() {
check(node.Serve(ln))
}()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/lithdew/bytesutil v0.0.0-20200409052507-d98389230a59
github.com/lithdew/kademlia v0.0.0-20200607181215-ff07ba2ac940
github.com/lithdew/monte v0.0.0-20200613034704-f6d482652dc3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.0 // indirect
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 // indirect
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ github.com/oasislabs/ed25519 v0.0.0-20200302143042-29f6767a7c3e h1:85L+lUTJHx4O7
github.com/oasislabs/ed25519 v0.0.0-20200302143042-29f6767a7c3e/go.mod h1:xIpCyrK2ouGA4QBGbiNbkoONrvJ00u9P3QOkXSOAC0c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
Expand Down
2 changes: 1 addition & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (n *Node) handleHandshakePacket(ctx *monte.Context, body []byte) error {
ID: n.id,
Services: n.Services(),
}
pkt.Signature = n.sec.Sign(pkt.AppendPayloadTo(nil))
pkt.Signature = n.sec.Sign(pkt.AppendPayloadTo(body[:0]))

return ctx.Reply(pkt.AppendTo(body[:0]))
}
Expand Down
122 changes: 122 additions & 0 deletions ts/flatend.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import {EventEmitter} from "events";
import {HandshakePacket, RequestPacket, ResponsePacket} from "./packet.js";
import MonteSocket from "./monte.js";
import ip from "ip";

const OPCODE_HANDSHAKE = 0;
const OPCODE_REQUEST = 1;

class Flatend {
/**
*
* @param {ID} [id]
* @param {nacl.SignKeyPair} [keys]
*/
constructor({id, keys}) {
this.listeners = new EventEmitter();
this.self = id;
this.keys = keys;
}

/**
*
* @param {string} service
* @param handler
*/
register(service, handler) {
if (this.listeners.listenerCount(service) > 0)
throw new Error(`Service '${service}' is already registered.`)

this.listeners.on(service, ({seq, data}) => {
let res;
try {
res = handler(data);
} catch (err) {
res = {error: err.toString()}
}

if (!Buffer.isBuffer(res) && typeof res !== "string") {
res = JSON.stringify(res);
}

this.conn.reply(seq, new ResponsePacket(res && Buffer.from(res)).encode());
});
}

async start(opts) {
this.conn = await MonteSocket.dial(opts);

await this.handshake();

this.conn.on('data', this._data.bind(this));
this.conn.on('error', console.error);

this.conn.once('close', () => {
const reconnect = async () => {
console.log(`Trying to reconnect to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}. Sleeping for 1s.`);

try {
await this.start({port: 9000, host: "127.0.0.1"});

console.log(`Successfully connected to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}.`);
} catch (err) {
this.conn.destroy(err);

setTimeout(reconnect, 1000);
}
};

setTimeout(reconnect, 1000);
});
}

_data({seq, frame}) {
const opcode = frame.readUInt8();
frame = frame.slice(1);

switch (opcode) {
case OPCODE_REQUEST:
const packet = RequestPacket.decode(frame);

const service = packet.services.find(service => this.listeners.listenerCount(service) > 0);
if (!service) {
this.conn.reply(seq, new ResponsePacket().encode());
return;
}

this.listeners.emit(service, {seq, data: packet.data});
// emit out, catch, grab result from return statement and
break;
default:
throw new Error(`Unknown opcode ${opcode}.`);
}
}

async handshake() {
let packet = new HandshakePacket(
{id: this.self, services: [...this.listeners.eventNames()]}
).sign(this.keys.secretKey);

packet = HandshakePacket.decode(await this.request(OPCODE_HANDSHAKE, packet.encode()));

this.id = packet.id;
this.services = packet.services;
}

_prepare(opcode, data) {
const header = Buffer.alloc(1);
header.writeUInt8(opcode);

return Buffer.concat([header, data]);
}

async request(opcode, data, timeout = 3000) {
return await this.conn.request(this._prepare(opcode, data), timeout)
}

send(opcode, data) {
return this.conn.send(this._prepare(opcode, data));
}
}

export {Flatend};
124 changes: 3 additions & 121 deletions ts/main.js
Original file line number Diff line number Diff line change
@@ -1,126 +1,8 @@
import nacl from "tweetnacl";
import {EventEmitter} from "events";
import MonteSocket from "./monte.js";
import {HandshakePacket, ID, RequestPacket, ResponsePacket} from "./packet.js";
import {Flatend} from "./flatend.js";
import {ID} from "./packet.js";
import ip from "ip";

const OPCODE_HANDSHAKE = 0;
const OPCODE_REQUEST = 1;


class Flatend {
/**
*
* @param {ID} [id]
* @param {nacl.SignKeyPair} [keys]
*/
constructor({id, keys}) {
this.listeners = new EventEmitter();
this.self = id;
this.keys = keys;
}

/**
*
* @param {string} service
* @param handler
*/
register(service, handler) {
if (this.listeners.listenerCount(service) > 0)
throw new Error(`Service '${service}' is already registered.`)

this.listeners.on(service, ({seq, data}) => {
let res;
try {
res = handler(data);
} catch (err) {
res = {error: err.toString()}
}

if (!Buffer.isBuffer(res) && typeof res !== "string") {
res = JSON.stringify(res);
}

this.conn.reply(seq, new ResponsePacket(res && Buffer.from(res)).encode());
});
}

async start(opts) {
this.conn = await MonteSocket.dial(opts);

await this.handshake();

this.conn.on('data', this._data.bind(this));
this.conn.on('error', console.error);

this.conn.once('close', () => {
const reconnect = async () => {
console.log(`Trying to reconnect to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}. Sleeping for 1s.`);

try {
await this.start({port: 9000, host: "127.0.0.1"});

console.log(`Successfully connected to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}.`);
} catch (err) {
this.conn.destroy(err);

setTimeout(reconnect, 1000);
}
};

setTimeout(reconnect, 1000);
});
}

_data({seq, frame}) {
const opcode = frame.readUInt8();
frame = frame.slice(1);

switch (opcode) {
case OPCODE_REQUEST:
const packet = RequestPacket.decode(frame);

const service = packet.services.find(service => this.listeners.listenerCount(service) > 0);
if (!service) {
this.conn.reply(seq, new ResponsePacket().encode());
return;
}

this.listeners.emit(service, {seq, data: packet.data});
// emit out, catch, grab result from return statement and
break;
default:
throw new Error(`Unknown opcode ${opcode}.`);
}
}

async handshake() {
let packet = new HandshakePacket(
{id: this.self, services: [...this.listeners.eventNames()]}
).sign(this.keys.secretKey);

packet = HandshakePacket.decode(await this.request(OPCODE_HANDSHAKE, packet.encode()));

this.id = packet.id;
this.services = packet.services;
}

_prepare(opcode, data) {
const header = Buffer.alloc(1);
header.writeUInt8(opcode);

return Buffer.concat([header, data]);
}

async request(opcode, data, timeout = 3000) {
return await this.conn.request(this._prepare(opcode, data), timeout)
}

send(opcode, data) {
return this.conn.send(this._prepare(opcode, data));
}
}


async function main() {
const keys = nacl.sign.keyPair();
Expand All @@ -137,7 +19,7 @@ async function main() {
return data;
});

backend.register("all_todos", data => {})
backend.register("all_todos", () => "hello world!");

await backend.start({port: 9000, host: "127.0.0.1"});

Expand Down

0 comments on commit 671217b

Please sign in to comment.