Skip to content

Commit

Permalink
sdk/node: implement FindNodeRequest, FindNodeResponse, and kademlia r…
Browse files Browse the repository at this point in the history
…outing table
  • Loading branch information
lithdew committed Jun 17, 2020
1 parent f10b026 commit c6118b7
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 13 deletions.
30 changes: 17 additions & 13 deletions nodejs/src/flatend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,18 @@ export class Context extends Duplex {
}

class Client {
addr: string;
sock: MonteSocket;

count: number; // streams count
streams: Map<number, Context>; // stream id -> stream

id?: ID | null;
services?: string[];
id?: ID;
services: string[] = [];


constructor(sock: MonteSocket) {
constructor(addr: string, sock: MonteSocket) {
this.addr = addr;
this.sock = sock;

this.count = 0;
Expand Down Expand Up @@ -192,22 +194,20 @@ export class Node {
const port = parseInt(fields[1]);
assert(port > 0 && port < 65536)

addr = host + ":" + port;

let client = this.#clients.get(addr);
if (!client) {
client = new Client(await MonteSocket.connect({host: host, port: port}));
client = new Client(host + ":" + port, await MonteSocket.connect({host: host, port: port}));

client.sock.once('end', () => {
client?.services?.forEach(service => this.#services.get(service)?.delete(client!));
client!.services.forEach(service => this.#services.get(service)?.delete(client!));
this.#providers.delete(client!.sock);
this.#clients.delete(addr);
this.#clients.delete(client!.addr);

const reconnect = async () => {
console.log(`Trying to reconnect to ${addr}. Sleeping for 1s.`);
console.log(`Trying to reconnect to ${client!.addr}. Sleeping for 1s.`);

try {
await this.dial(addr);
await this.dial(client!.addr);
} catch (err) {
setTimeout(reconnect, 1000);
}
Expand All @@ -219,10 +219,14 @@ export class Node {
client.sock.on('data', this._data.bind(this));
client.sock.on('error', console.error);

this.#clients.set(addr, client);
this.#clients.set(client.addr, client);
this.#providers.set(client.sock, client);
}

await this.probe(client);
}

private async probe(client: Client) {
let packet = new HandshakePacket(null, this.services, null);
if (!this.anonymous) {
packet.id = this.#id;
Expand All @@ -233,7 +237,7 @@ export class Node {
packet = HandshakePacket.decode(res)[0];

if (packet.id && packet.signature) {
assert(typeof packet.id.host === "string" && ip.isEqual(packet.id.host, host) && packet.id.port === port);
// assert(typeof packet.id.host === "string" && ip.isEqual(packet.id.host, host) && packet.id.port === port);
assert(nacl.sign.detached.verify(packet.payload, packet.signature, packet.id.publicKey));

client.id = packet.id;
Expand All @@ -246,7 +250,7 @@ export class Node {
this.#services.get(service)!.add(client!);
});

console.log(`Successfully dialed ${addr}. Services: [${packet.services.join(', ')}]`);
console.log(`Successfully dialed ${client.addr}. Services: [${packet.services.join(', ')}]`);
}

private _data({sock, seq, body}: { sock: MonteSocket, seq: number, body: Buffer }) {
Expand Down
141 changes: 141 additions & 0 deletions nodejs/src/packet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export enum Opcode {
ServiceRequest,
ServiceResponse,
Data,
FindNodeRequest,
FindNodeResponse,
}

export interface Packet {
Expand Down Expand Up @@ -303,4 +305,143 @@ export class DataPacket implements Packet {

return [new DataPacket(id, data), buf];
}
}

export class FindNodeRequest implements Packet {
target: ID

public constructor(target: ID) {
this.target = target;
}

public encode(): Buffer {
return this.target.encode();
}

public static decode(buf: Buffer): [FindNodeRequest, Buffer] {
const [target, leftover] = ID.decode(buf);
buf = leftover;
return [new FindNodeRequest(target), buf]
}
}

export class FindNodeResponse implements Packet {
closest: ID[]

public constructor(closest: ID[]) {
this.closest = closest;
}

public encode(): Buffer {
return Buffer.concat([Buffer.of(this.closest.length), ...this.closest.map(id => id.encode())]);
}

public decode(buf: Buffer): [FindNodeResponse, Buffer] {
const closestLen = buf.readUInt8();
buf = buf.slice(1);

const closest = [...Array(closestLen)].map(() => {
const [id, leftover] = ID.decode(buf);
buf = leftover;
return id;
});

return [new FindNodeResponse(closest), buf];
}
}

export enum UpdateResult {
New,
Ok,
Full,
Fail,
}

const leadingZeros = (buf: Uint8Array): number => {
const i = buf.findIndex(b => b != 0);
if (i == -1) return buf.byteLength * 8;

let b = buf[i] >>> 0;
if (b === 0) return i * 8 + 8;
return i * 8 + (7 - (Math.log(b) / Math.LN2 | 0) | 0);
}

const xor = (a: Uint8Array, b: Uint8Array): Uint8Array => {
const c = Buffer.alloc(Math.min(a.byteLength, b.byteLength))
for (let i = 0; i < c.byteLength; i++) c[i] = a[i] ^ b[i];
return c;
}

export class Table {
id: ID;
cap: number = 16;
length: number = 0;
buckets: Array<Array<ID>> = [...Array(nacl.sign.publicKeyLength * 8)].map(() => []);

public constructor(id: ID) {
this.id = id;
}

private bucketIndex(pub: Uint8Array): number {
if (Buffer.compare(pub, this.id.publicKey) === 0) return 0;
return leadingZeros(xor(pub, this.id.publicKey));
}

public update(id: ID): UpdateResult {
if (Buffer.compare(id.publicKey, this.id.publicKey) === 0) return UpdateResult.Fail;

const bucket = this.buckets[this.bucketIndex(id.publicKey)];

const i = bucket.findIndex(item => Buffer.compare(item.publicKey, id.publicKey) === 0);
if (i >= 0) {
bucket.unshift(...bucket.splice(i, 1));
return UpdateResult.Ok;
}

if (bucket.length < this.cap) {
bucket.unshift(id);
this.length++;
return UpdateResult.New;
}
return UpdateResult.Full;
}

public delete(pub: Uint8Array): boolean {
const bucket = this.buckets[this.bucketIndex(pub)];
const i = bucket.findIndex(id => Buffer.compare(id.publicKey, pub) === 0);
if (i >= 1) {
bucket.splice(i, 1);
this.length--;
return true;
}
return false;
}

public has(pub: Uint8Array): boolean {
const bucket = this.buckets[this.bucketIndex(pub)];
return !!bucket.find(id => Buffer.compare(id.publicKey, pub) === 0);
}

public closestTo(pub: Uint8Array, k = this.cap): ID[] {
const closest: ID[] = [];

const fill = (i: number) => {
const bucket = this.buckets[i];
for (let i = 0; closest.length < k && i < bucket.length; i++) {
if (Buffer.compare(bucket[i].publicKey, pub) != 0) closest.push(bucket[i]);
}
return closest.length < k;
};

const m = this.bucketIndex(pub);
let l = m - 1;
let r = m + 1;

fill(m);
while ((l >= 0 && fill(l)) || (r < this.buckets.length && fill(r))) {
[l, r] = [l-1, r+1];
}

return closest;
}
}

0 comments on commit c6118b7

Please sign in to comment.