Skip to content

Commit

Permalink
net, node, sdk/js: add backpressure support to MonteSocket, implement…
Browse files Browse the repository at this point in the history
… ResponsePacket to allow for more flexibility of responses from microservices
  • Loading branch information
lithdew committed Jun 12, 2020
1 parent ae8b1ce commit 4caf26c
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 165 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ replace github.com/lithdew/kademlia => ../kademlia

require (
github.com/BurntSushi/toml v0.3.1
github.com/davecgh/go-spew v1.1.1
github.com/jpillora/backoff v1.0.0
github.com/json-iterator/go v1.1.10
github.com/julienschmidt/httprouter v1.3.0
github.com/lithdew/bytesutil v0.0.0-20200409052507-d98389230a59
github.com/lithdew/kademlia v0.0.0-20200607181215-ff07ba2ac940
github.com/lithdew/monte v0.0.0-20200612062106-02975bf1abd2
github.com/lithdew/monte v0.0.0-20200612163155-688f7a476468
github.com/stretchr/testify v1.6.0 // indirect
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 // indirect
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ github.com/lithdew/monte v0.0.0-20200611093340-15ff088304c9 h1:XmfrU1ZyBDuiZ2/UU
github.com/lithdew/monte v0.0.0-20200611093340-15ff088304c9/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc=
github.com/lithdew/monte v0.0.0-20200612062106-02975bf1abd2 h1:m1MUK1qrH7bJyKiY5x4n02kxrwZuNt+6z2UNUniD1FA=
github.com/lithdew/monte v0.0.0-20200612062106-02975bf1abd2/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc=
github.com/lithdew/monte v0.0.0-20200612162118-d9ef724a278f h1:ePrvQ/QMnHVDjWfan0twg6lD1WYy03Un8kxH7dAl86k=
github.com/lithdew/monte v0.0.0-20200612162118-d9ef724a278f/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc=
github.com/lithdew/monte v0.0.0-20200612163155-688f7a476468 h1:uVJAVZL+BulMCLaRtjVwp4jH2d6IIls+QjkIZdddCvY=
github.com/lithdew/monte v0.0.0-20200612163155-688f7a476468/go.mod h1:qisbxmkjwkE+XOLjCiJe+z1U4f95XxF5Hfrk21Rb1yc=
github.com/lithdew/reliable v0.0.0-20200506103725-7df64908b057 h1:CBhKVPym/7ZzY7ascOG93XSTlJuqrmIU/Hd0UDHC1TA=
github.com/lithdew/reliable v0.0.0-20200506103725-7df64908b057/go.mod h1:b9iSDHZ4DaCGpwhQdKsH0u61UancBXJMe0r8SCPKEEA=
github.com/lithdew/seq v0.0.0-20200504083424-74d5d8117a05 h1:j1UtG8NYCupA5xUwQ/vrTf/zjuNlZ0D1n7UtM8LhS58=
Expand Down
38 changes: 38 additions & 0 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ const (
OpcodeRequest
)

type PayloadPacket struct {
Headers map[string]string
Body []byte
}

type HandshakePacket struct {
ID kademlia.ID
Services []string
Expand Down Expand Up @@ -138,6 +143,39 @@ func (h HandshakePacket) Validate(dst []byte) error {
return nil
}

type ResponsePacket []byte

func (r ResponsePacket) AppendTo(dst []byte) []byte {
if r == nil {
dst = append(dst, 0)
} else {
dst = append(dst, 1)
dst = bytesutil.AppendUint32BE(dst, uint32(len(r)))
dst = append(dst, r...)
}
return dst
}

func UnmarshalResponsePacket(buf []byte) (ResponsePacket, error) {
if len(buf) < 1 || (buf[0] != 0 && buf[0] != 1) {
return nil, io.ErrUnexpectedEOF
}
handled := buf[0] == 1
buf = buf[1:]
if !handled {
return nil, nil
}
if len(buf) < 4 {
return nil, io.ErrUnexpectedEOF
}
size := bytesutil.Uint32BE(buf[:4])
buf = buf[4:]
if uint32(len(buf)) < size {
return nil, io.ErrUnexpectedEOF
}
return buf, nil
}

type RequestPacket struct {
Services []string
Data []byte
Expand Down
6 changes: 6 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ func (n *Node) Process(services []string, data []byte) ([]byte, error) {
for _, provider := range providers {
dst, err = provider.conn.Request(dst[:0], req)
if err == nil {
dst, err = UnmarshalResponsePacket(dst)
}
if err == nil && dst != nil {
return dst, nil
}
if err != nil {
fmt.Println(err)
}
}

return nil, fmt.Errorf("no nodes were able to process your request for service(s): %s", services)
Expand Down
6 changes: 1 addition & 5 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ func (p *Peer) HandleMessage(ctx *monte.Context) error {
if handler == nil {
continue
}
res := handler(ftx)
if res != nil {
return ctx.Reply(res)
}
return nil
return ctx.Reply(ResponsePacket(handler(ftx)).AppendTo(nil))
}

return nil
Expand Down
68 changes: 45 additions & 23 deletions ts/main.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import nacl from "tweetnacl";
import {EventEmitter} from "events";
import MonteSocket from "./monte.js";
import {HandshakePacket, ID, RequestPacket} from "./packet.js";
import {HandshakePacket, ID, RequestPacket, ResponsePacket} from "./packet.js";
import ip from "ip";

const OPCODE_HANDSHAKE = 0;
const OPCODE_REQUEST = 1;


class FlatendClient {
class Flatend {
/**
*
* @param {ID} id
* @param {nacl.SignKeyPair} keys
* @param {ID} [id]
* @param {nacl.SignKeyPair} [keys]
*/
constructor({id, keys}) {
this.listeners = new EventEmitter();
Expand All @@ -30,7 +30,18 @@ class FlatendClient {
throw new Error(`Service '${service}' is already registered.`)

this.listeners.on(service, ({seq, data}) => {
this.conn.reply(seq, handler(data));
let res;
try {
res = handler(data);
} catch (err) {
res = {error: err.toString()}
}

if (!Buffer.isBuffer(res) && typeof res !== "string") {
res = JSON.stringify(res);
}

this.conn.reply(seq, new ResponsePacket(Buffer.from(res)).encode());
});
}

Expand All @@ -41,6 +52,22 @@ class FlatendClient {

this.conn.on('data', this._data.bind(this));
this.conn.on('error', console.error);

this.conn.on('close', () => {
const reconnect = async () => {
console.log(`Trying to reconnect to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}. Sleeping for 1s.`);

try {
await this.start({port: 9000, host: "127.0.0.1"});

console.log(`Successfully connected to ${ip.toString(this.id.host, 12, 4)}:${this.id.port}.`);
} catch (err) {
setTimeout(reconnect, 1000);
}
};

setTimeout(reconnect, 1000);
});
}

_data({seq, frame}) {
Expand All @@ -53,7 +80,7 @@ class FlatendClient {

const service = packet.services.find(service => this.listeners.listenerCount(service) > 0);
if (!service) {
// reply with an error
this.conn.reply(seq, new ResponsePacket().encode());
return;
}

Expand Down Expand Up @@ -97,28 +124,23 @@ async function main() {
const keys = nacl.sign.keyPair();
const id = new ID({publicKey: keys.publicKey, host: "127.0.0.1", port: 12000});

const client = new FlatendClient({id, keys});
client.register("get_todos", data => data);
const backend = new Flatend({id, keys});

await client.start({port: 9000, host: "127.0.0.1"});
backend.register("get_todos", data => {
data = JSON.parse(data);

client.conn.on('close', () => {
const reconnect = async () => {
console.log(`Trying to reconnect to ${ip.toString(client.id.host, 12, 4)}:${client.id.port}. Sleeping for 1s.`);
if (parseInt(data?.params?.id) !== 123)
throw new Error(`ID must be 123. Got ${data?.params?.id}.`);

try {
await client.start({port: 9000, host: "127.0.0.1"});
return data;
});

console.log(`Successfully connected to ${ip.toString(client.id.host, 12, 4)}:${client.id.port}.`);
} catch (err) {
setTimeout(reconnect, 1000);
}
};
backend.register("all_todos", data => "hello world!")

await backend.start({port: 9000, host: "127.0.0.1"});

setTimeout(reconnect, 1000);
})

console.log(`Successfully connected to ${ip.toString(client.id.host, 12, 4)}:${client.id.port}.`);
console.log(`Successfully connected to ${ip.toString(backend.id.host, 12, 4)}:${backend.id.port}.`);
}

main().catch(err => console.error(err));
main().catch(err => console.error(err));
20 changes: 16 additions & 4 deletions ts/monte.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import {Duplex} from "stream";
import stream from "readable-stream";
import {EventEmitter} from "events";
import net from "net";
import nacl from "tweetnacl";
import blake2b from "blake2b";
import crypto from "crypto";

const {Duplex} = stream;

/**
*
* @param {Socket} sock
Expand Down Expand Up @@ -53,6 +55,8 @@ class MonteSocket extends Duplex {
this.pending = new EventEmitter();
this.counter = 0;

this._paused = false;

this.conn.on('close', had_error => this.emit('close', had_error));
this.conn.on('connect', () => this.emit('connect'));
this.conn.on('drain', () => this.emit('drain'));
Expand Down Expand Up @@ -185,19 +189,21 @@ class MonteSocket extends Duplex {
this.conn.write(Buffer.concat([header, chunk]), callback);
}

_read(_size) {
_read() {
this._paused = false;
setImmediate(this._readable.bind(this));
}

_readable() {
while (true) {
while (!this._paused) {
const header = this.conn.read(4);
if (!header) return;

const length = header.readUInt32BE();

let frame = this.conn.read(length);
if (!frame) {
console.log(`Trying to read ${length} byte(s), only ${this.conn.readableLength} byte(s) available.`)
this.conn.unshift(header);
return;
}
Expand All @@ -213,7 +219,9 @@ class MonteSocket extends Duplex {

if (seq === 0 || this.pending.listenerCount(`${seq}`) === 0) {
try {
this.push({seq, frame});
if (!this.push({seq, frame})) {
this._paused = true;
}
} catch (err) {
this.emit("error", err);
}
Expand All @@ -222,6 +230,10 @@ class MonteSocket extends Duplex {
}
}
}

_final(cb) {
this.conn.end(cb);
}
}

export default MonteSocket;
106 changes: 0 additions & 106 deletions ts/package-lock.json

This file was deleted.

Loading

0 comments on commit 4caf26c

Please sign in to comment.