From b2b14625bb35931036ab67e8324c88790c3e8097 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 5 Sep 2022 21:32:51 +1000 Subject: [PATCH 1/6] refactor: remove unused function --- src/lib/enr/keypair/index.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/lib/enr/keypair/index.ts b/src/lib/enr/keypair/index.ts index 71be1d62ab..cc1a0c6e2f 100644 --- a/src/lib/enr/keypair/index.ts +++ b/src/lib/enr/keypair/index.ts @@ -10,16 +10,6 @@ export const ERR_TYPE_NOT_IMPLEMENTED = "Keypair type not implemented"; export * from "./types"; export * from "./secp256k1"; -// TODO: Check if @libp2p/crypto methods can be used instead. -export async function generateKeypair(type: KeypairType): Promise { - switch (type) { - case KeypairType.secp256k1: - return await Secp256k1Keypair.generate(); - default: - throw new Error(ERR_TYPE_NOT_IMPLEMENTED); - } -} - export function createKeypair( type: KeypairType, privateKey?: Uint8Array, From 9dd00fc0268240cf07cc3c3658aeaf74724668da Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 5 Sep 2022 21:47:21 +1000 Subject: [PATCH 2/6] chore(deps): upgrade @libp2p/interface-connection-manager --- package-lock.json | 44 ++++++++------------------------------------ 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/package-lock.json b/package-lock.json index cef9b6a552..c085c5f607 100644 --- a/package-lock.json +++ b/package-lock.json @@ -981,11 +981,11 @@ } }, "node_modules/@libp2p/interface-connection-manager": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.2.tgz", - "integrity": "sha512-92gM7sZhVidD+vsQbc+LbI4MMvxgRjFy9kUrrsOosbtCt0nl68rIeRFKRpfX92/4QY40tL41VXT69ijCUskEwg==", + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.3.tgz", + "integrity": "sha512-zDDzAKbtCkqR/3AmZ3DAoK1bt+5vhyUruV8654R9IT5PI7IBBgFnYzvkWHDI/UDvhwT27ubofPagp0m25gQZvg==", "dependencies": { - "@libp2p/interface-connection": "^2.0.0", + "@libp2p/interface-connection": "^3.0.0", "@libp2p/interface-peer-id": "^1.0.0", "@libp2p/interfaces": "^3.0.0" }, @@ -994,21 +994,6 @@ "npm": ">=7.0.0" } }, - "node_modules/@libp2p/interface-connection-manager/node_modules/@libp2p/interface-connection": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/@libp2p/interface-connection/-/interface-connection-2.1.1.tgz", - "integrity": "sha512-gjugaMsZvfo3r4tCc/yPifVQsfLogmEmJtW+eXMNiNDna3ZfmwWD9Z+KyEwuVsXKs0C4GESXei2y4SJSCEfkbA==", - "dependencies": { - "@libp2p/interface-peer-id": "^1.0.0", - "@libp2p/interfaces": "^3.0.0", - "@multiformats/multiaddr": "^10.2.0", - "it-stream-types": "^1.0.4" - }, - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, "node_modules/@libp2p/interface-content-routing": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/@libp2p/interface-content-routing/-/interface-content-routing-1.0.2.tgz", @@ -12929,26 +12914,13 @@ } }, "@libp2p/interface-connection-manager": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.2.tgz", - "integrity": "sha512-92gM7sZhVidD+vsQbc+LbI4MMvxgRjFy9kUrrsOosbtCt0nl68rIeRFKRpfX92/4QY40tL41VXT69ijCUskEwg==", + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.3.tgz", + "integrity": "sha512-zDDzAKbtCkqR/3AmZ3DAoK1bt+5vhyUruV8654R9IT5PI7IBBgFnYzvkWHDI/UDvhwT27ubofPagp0m25gQZvg==", "requires": { - "@libp2p/interface-connection": "^2.0.0", + "@libp2p/interface-connection": "^3.0.0", "@libp2p/interface-peer-id": "^1.0.0", "@libp2p/interfaces": "^3.0.0" - }, - "dependencies": { - "@libp2p/interface-connection": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/@libp2p/interface-connection/-/interface-connection-2.1.1.tgz", - "integrity": "sha512-gjugaMsZvfo3r4tCc/yPifVQsfLogmEmJtW+eXMNiNDna3ZfmwWD9Z+KyEwuVsXKs0C4GESXei2y4SJSCEfkbA==", - "requires": { - "@libp2p/interface-peer-id": "^1.0.0", - "@libp2p/interfaces": "^3.0.0", - "@multiformats/multiaddr": "^10.2.0", - "it-stream-types": "^1.0.4" - } - } } }, "@libp2p/interface-content-routing": { From ac48792e0e3a989e9deaf026f5b6a914fffb83e3 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 5 Sep 2022 21:48:27 +1000 Subject: [PATCH 3/6] feat: implement a simple connection management --- CHANGELOG.md | 4 ++++ src/lib/select_connection.ts | 24 ++++++++++++++++++++++++ src/lib/waku_filter/index.ts | 10 ++++------ src/lib/waku_light_push/index.ts | 8 +++++--- src/lib/waku_store/index.ts | 9 +++++---- 5 files changed, 42 insertions(+), 13 deletions(-) create mode 100644 src/lib/select_connection.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index a31c267449..a3886e5d51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Simple connection management that selects the most recent connection for store, light push and filter requests. + ## [0.25.0] - 2022-09-5 ### Changed diff --git a/src/lib/select_connection.ts b/src/lib/select_connection.ts new file mode 100644 index 0000000000..05977c7ab5 --- /dev/null +++ b/src/lib/select_connection.ts @@ -0,0 +1,24 @@ +import { Connection } from "@libp2p/interface-connection"; + +export function selectConnection( + connections: Connection[] +): Connection | undefined { + if (!connections.length) return; + if (connections.length === 1) return connections[0]; + + let latestConnection: Connection | undefined; + + connections.forEach((connection) => { + if (connection.stat.status === "OPEN") { + if (!latestConnection) { + latestConnection = connection; + } else if ( + connection.stat.timeline.open > latestConnection.stat.timeline.open + ) { + latestConnection = connection; + } + } + }); + + return latestConnection; +} diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index bef0827954..c63bfd0aae 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -10,6 +10,7 @@ import type { Libp2p } from "libp2p"; import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { DefaultPubSubTopic } from "../constants"; +import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -216,17 +217,14 @@ export class WakuFilter { } } - // Should be able to remove any at next libp2p release >0.37.3 private async newStream(peer: Peer): Promise { const connections = this.libp2p.connectionManager.getConnections(peer.id); - if (!connections) { + const connection = selectConnection(connections); + if (!connection) { throw new Error("Failed to get a connection to the peer"); } - // TODO: Appropriate connection selection - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore: tsc is confused by the @libp2p/interface-connection type to use - return connections[0].newStream(FilterCodec); + return connection.newStream(FilterCodec); } private async getPeer(peerId?: PeerId): Promise { diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 623069d67e..101bb514f0 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -8,6 +8,7 @@ import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; import { DefaultPubSubTopic } from "../constants"; +import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { WakuMessage } from "../waku_message"; @@ -59,10 +60,11 @@ export class WakuLightPush { throw "Peer does not register waku light push protocol"; const connections = this.libp2p.connectionManager.getConnections(peer.id); - if (!connections) throw "Failed to get a connection to the peer"; + const connection = selectConnection(connections); - // TODO: Appropriate connection management - const stream = await connections[0].newStream(LightPushCodec); + if (!connection) throw "Failed to get a connection to the peer"; + + const stream = await connection.newStream(LightPushCodec); try { const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 19534a7584..d9486516be 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -10,6 +10,7 @@ import { Uint8ArrayList } from "uint8arraylist"; import * as protoV2Beta4 from "../../proto/store_v2beta4"; import { HistoryResponse } from "../../proto/store_v2beta4"; import { DefaultPubSubTopic, StoreCodecs } from "../constants"; +import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -171,8 +172,9 @@ export class WakuStore { Object.assign(opts, { storeCodec }); const connections = this.libp2p.connectionManager.getConnections(peer.id); - if (!connections || !connections.length) - throw "Failed to get a connection to the peer"; + const connection = selectConnection(connections); + + if (!connection) throw "Failed to get a connection to the peer"; const decryptionKeys = Array.from(this.decryptionKeys).map( ([key, { method, contentTopics }]) => { @@ -199,8 +201,7 @@ export class WakuStore { const messages: WakuMessage[] = []; let cursor = undefined; while (true) { - // TODO: Some connection selection logic? - const stream = await connections[0].newStream(storeCodec); + const stream = await connection.newStream(storeCodec); const queryOpts = Object.assign(opts, { cursor }); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); dbg("Querying store peer", connections[0].remoteAddr.toString()); From f44e13885c1938af61892bc7cf7e89919a19281e Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 5 Sep 2022 21:49:23 +1000 Subject: [PATCH 4/6] chore: replace todo with issue reference --- src/lib/waku_message/index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lib/waku_message/index.ts b/src/lib/waku_message/index.ts index 6bf6851136..1acee41889 100644 --- a/src/lib/waku_message/index.ts +++ b/src/lib/waku_message/index.ts @@ -270,8 +270,7 @@ export class WakuMessage { } get version(): number { - // TODO: absent value should be replaced by default - // value of the type by the protobuf decoder + // https://github.com/status-im/js-waku/issues/921 return this.proto.version ?? 0; } From e4d4fb1edd4a80f43c61a323de1caccf6dc07d03 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 5 Sep 2022 22:07:18 +1000 Subject: [PATCH 5/6] feat: `DecryptionParams` may be passed when using `queryHistory` --- CHANGELOG.md | 4 +++ src/lib/waku_filter/index.ts | 4 +-- src/lib/waku_message/index.ts | 1 - src/lib/waku_relay/index.ts | 4 +-- src/lib/waku_store/index.node.spec.ts | 4 +-- src/lib/waku_store/index.ts | 38 +++++++++++++-------------- 6 files changed, 28 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3886e5d51..374df414e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Simple connection management that selects the most recent connection for store, light push and filter requests. +### Changed + +- **breaking**: `DecryptionParams` may be passed when using `queryHistory` instead of just keys. + ## [0.25.0] - 2022-09-5 ### Changed diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index c63bfd0aae..bd3bc8e8c9 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -167,7 +167,7 @@ export class WakuFilter { return; } - const decryptionKeys = Array.from(this.decryptionKeys).map( + const decryptionParams = Array.from(this.decryptionKeys).map( ([key, { method, contentTopics }]) => { return { key, @@ -178,7 +178,7 @@ export class WakuFilter { ); for (const message of messages) { - const decoded = await WakuMessage.decodeProto(message, decryptionKeys); + const decoded = await WakuMessage.decodeProto(message, decryptionParams); if (!decoded) { log("Not able to decode message"); continue; diff --git a/src/lib/waku_message/index.ts b/src/lib/waku_message/index.ts index 1acee41889..0f4379b62a 100644 --- a/src/lib/waku_message/index.ts +++ b/src/lib/waku_message/index.ts @@ -38,7 +38,6 @@ export interface Options { sigPrivKey?: Uint8Array; } -// TODO: Use this in Options export interface DecryptionParams { key: Uint8Array; method?: DecryptionMethod; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index c104f0b3ab..edb46d0e8c 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -184,7 +184,7 @@ export class WakuRelay extends GossipSub { "gossipsub:message", (event: CustomEvent) => { if (event.detail.msg.topic === pubSubTopic) { - const decryptionKeys = Array.from(this.decryptionKeys).map( + const decryptionParams = Array.from(this.decryptionKeys).map( ([key, { method, contentTopics }]) => { return { key, @@ -195,7 +195,7 @@ export class WakuRelay extends GossipSub { ); dbg(`Message received on ${pubSubTopic}`); - WakuMessage.decode(event.detail.msg.data, decryptionKeys) + WakuMessage.decode(event.detail.msg.data, decryptionParams) .then((wakuMsg) => { if (!wakuMsg) { dbg("Failed to decode Waku Message"); diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 5a55ef8b34..2b05c1e21a 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -304,7 +304,7 @@ describe("Waku Store", () => { dbg("Retrieve messages from store"); const messages = await waku2.store.queryHistory([], { - decryptionKeys: [privateKey], + decryptionParams: [{ key: privateKey }], }); expect(messages?.length).eq(3); @@ -411,7 +411,7 @@ describe("Waku Store", () => { dbg("Retrieve messages from store"); const messages = await waku2.store.queryHistory([], { - decryptionKeys: [privateKey], + decryptionParams: [{ key: privateKey }], }); expect(messages?.length).eq(3); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index d9486516be..82eddc1ded 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -13,7 +13,11 @@ import { DefaultPubSubTopic, StoreCodecs } from "../constants"; import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { hexToBytes } from "../utils"; -import { DecryptionMethod, WakuMessage } from "../waku_message"; +import { + DecryptionMethod, + DecryptionParams, + WakuMessage, +} from "../waku_message"; import { HistoryRPC, PageDirection } from "./history_rpc"; @@ -91,7 +95,7 @@ export interface QueryOptions { * It can be Asymmetric Private Keys and Symmetric Keys in the same array, * all keys will be tried with both methods. */ - decryptionKeys?: Array; + decryptionParams?: DecryptionParams[]; } /** @@ -176,26 +180,20 @@ export class WakuStore { if (!connection) throw "Failed to get a connection to the peer"; - const decryptionKeys = Array.from(this.decryptionKeys).map( - ([key, { method, contentTopics }]) => { - return { - key, - method, - contentTopics, - }; - } - ); + let decryptionParams: DecryptionParams[] = []; + + this.decryptionKeys.forEach(({ method, contentTopics }, key) => { + decryptionParams.push({ + key, + method, + contentTopics, + }); + }); // Add the decryption keys passed to this function against the // content topics also passed to this function. - if (opts.decryptionKeys) { - opts.decryptionKeys.forEach((key) => { - decryptionKeys.push({ - key: hexToBytes(key), - contentTopics: contentTopics.length ? contentTopics : undefined, - method: undefined, - }); - }); + if (opts.decryptionParams) { + decryptionParams = decryptionParams.concat(opts.decryptionParams); } const messages: WakuMessage[] = []; @@ -245,7 +243,7 @@ export class WakuStore { const pageMessages: WakuMessage[] = []; await Promise.all( response.messages.map(async (protoMsg) => { - const msg = await WakuMessage.decodeProto(protoMsg, decryptionKeys); + const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams); if (msg) { messages.push(msg); From 76251c922e2c88d90c71e763a7951d21cfc1e91a Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Wed, 7 Sep 2022 12:09:23 +1000 Subject: [PATCH 6/6] test: make it easier to understand test failure --- src/lib/waku_store/index.node.spec.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 2b05c1e21a..d82bdea493 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -228,10 +228,8 @@ describe("Waku Store", () => { nwaku = new Nwaku(makeLogFileName(this)); await nwaku.start({ persistMessages: true, store: true, lightpush: true }); - const encryptedAsymmetricMessageText = - "This message is encrypted for me using asymmetric"; - const encryptedSymmetricMessageText = - "This message is encrypted for me using symmetric encryption"; + const encryptedAsymmetricMessageText = "asymmetric encryption"; + const encryptedSymmetricMessageText = "symmetric encryption"; const clearMessageText = "This is a clear text message for everyone to read"; const otherEncMessageText = @@ -307,11 +305,9 @@ describe("Waku Store", () => { decryptionParams: [{ key: privateKey }], }); - expect(messages?.length).eq(3); - if (!messages) throw "Length was tested"; - expect(messages[0].payloadAsUtf8).to.eq(clearMessageText); - expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); + expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText); + expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText); + expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));