Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DecryptionParams may be passed when using queryHistory instead of just keys. #927

Merged
merged 6 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Simple connection management that selects the most recent connection for store, light push and filter requests.

### Changed

- **breaking**: `DecryptionParams` may be passed when using `queryHistory` instead of just keys.

## [0.25.0] - 2022-09-5

### Changed
Expand Down
44 changes: 8 additions & 36 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 0 additions & 10 deletions src/lib/enr/keypair/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ export const ERR_TYPE_NOT_IMPLEMENTED = "Keypair type not implemented";
export * from "./types";
export * from "./secp256k1";

// TODO: Check if @libp2p/crypto methods can be used instead.
export async function generateKeypair(type: KeypairType): Promise<IKeypair> {
switch (type) {
case KeypairType.secp256k1:
return await Secp256k1Keypair.generate();
default:
throw new Error(ERR_TYPE_NOT_IMPLEMENTED);
}
}

export function createKeypair(
type: KeypairType,
privateKey?: Uint8Array,
Expand Down
24 changes: 24 additions & 0 deletions src/lib/select_connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Connection } from "@libp2p/interface-connection";

export function selectConnection(
connections: Connection[]
): Connection | undefined {
if (!connections.length) return;
if (connections.length === 1) return connections[0];

let latestConnection: Connection | undefined;

connections.forEach((connection) => {
if (connection.stat.status === "OPEN") {
if (!latestConnection) {
latestConnection = connection;
} else if (
connection.stat.timeline.open > latestConnection.stat.timeline.open
) {
latestConnection = connection;
}
}
});

return latestConnection;
}
14 changes: 6 additions & 8 deletions src/lib/waku_filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { Libp2p } from "libp2p";

import { WakuMessage as WakuMessageProto } from "../../proto/message";
import { DefaultPubSubTopic } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message";
Expand Down Expand Up @@ -166,7 +167,7 @@ export class WakuFilter {
return;
}

const decryptionKeys = Array.from(this.decryptionKeys).map(
const decryptionParams = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
key,
Expand All @@ -177,7 +178,7 @@ export class WakuFilter {
);

for (const message of messages) {
const decoded = await WakuMessage.decodeProto(message, decryptionKeys);
const decoded = await WakuMessage.decodeProto(message, decryptionParams);
if (!decoded) {
log("Not able to decode message");
continue;
Expand Down Expand Up @@ -216,17 +217,14 @@ export class WakuFilter {
}
}

// Should be able to remove any at next libp2p release >0.37.3
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) {
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}

// TODO: Appropriate connection selection
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: tsc is confused by the @libp2p/interface-connection type to use
return connections[0].newStream(FilterCodec);
return connection.newStream(FilterCodec);
}

private async getPeer(peerId?: PeerId): Promise<Peer> {
Expand Down
8 changes: 5 additions & 3 deletions src/lib/waku_light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Uint8ArrayList } from "uint8arraylist";

import { PushResponse } from "../../proto/light_push";
import { DefaultPubSubTopic } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { WakuMessage } from "../waku_message";

Expand Down Expand Up @@ -59,10 +60,11 @@ export class WakuLightPush {
throw "Peer does not register waku light push protocol";

const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) throw "Failed to get a connection to the peer";
const connection = selectConnection(connections);

// TODO: Appropriate connection management
const stream = await connections[0].newStream(LightPushCodec);
if (!connection) throw "Failed to get a connection to the peer";

const stream = await connection.newStream(LightPushCodec);
try {
const pubSubTopic = opts?.pubSubTopic
? opts.pubSubTopic
Expand Down
4 changes: 1 addition & 3 deletions src/lib/waku_message/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export interface Options {
sigPrivKey?: Uint8Array;
}

// TODO: Use this in Options
export interface DecryptionParams {
key: Uint8Array;
method?: DecryptionMethod;
Expand Down Expand Up @@ -270,8 +269,7 @@ export class WakuMessage {
}

get version(): number {
// TODO: absent value should be replaced by default
// value of the type by the protobuf decoder
// https://github.com/status-im/js-waku/issues/921
return this.proto.version ?? 0;
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/waku_relay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class WakuRelay extends GossipSub {
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic === pubSubTopic) {
const decryptionKeys = Array.from(this.decryptionKeys).map(
const decryptionParams = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
key,
Expand All @@ -195,7 +195,7 @@ export class WakuRelay extends GossipSub {
);

dbg(`Message received on ${pubSubTopic}`);
WakuMessage.decode(event.detail.msg.data, decryptionKeys)
WakuMessage.decode(event.detail.msg.data, decryptionParams)
.then((wakuMsg) => {
if (!wakuMsg) {
dbg("Failed to decode Waku Message");
Expand Down
18 changes: 7 additions & 11 deletions src/lib/waku_store/index.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,8 @@ describe("Waku Store", () => {
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true, store: true, lightpush: true });

const encryptedAsymmetricMessageText =
"This message is encrypted for me using asymmetric";
const encryptedSymmetricMessageText =
"This message is encrypted for me using symmetric encryption";
const encryptedAsymmetricMessageText = "asymmetric encryption";
const encryptedSymmetricMessageText = "symmetric encryption";
const clearMessageText =
"This is a clear text message for everyone to read";
const otherEncMessageText =
Expand Down Expand Up @@ -304,14 +302,12 @@ describe("Waku Store", () => {

dbg("Retrieve messages from store");
const messages = await waku2.store.queryHistory([], {
decryptionKeys: [privateKey],
decryptionParams: [{ key: privateKey }],
});

expect(messages?.length).eq(3);
if (!messages) throw "Length was tested";
expect(messages[0].payloadAsUtf8).to.eq(clearMessageText);
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText);
expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);

!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
Expand Down Expand Up @@ -411,7 +407,7 @@ describe("Waku Store", () => {

dbg("Retrieve messages from store");
const messages = await waku2.store.queryHistory([], {
decryptionKeys: [privateKey],
decryptionParams: [{ key: privateKey }],
});

expect(messages?.length).eq(3);
Expand Down
49 changes: 24 additions & 25 deletions src/lib/waku_store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import { Uint8ArrayList } from "uint8arraylist";
import * as protoV2Beta4 from "../../proto/store_v2beta4";
import { HistoryResponse } from "../../proto/store_v2beta4";
import { DefaultPubSubTopic, StoreCodecs } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import {
DecryptionMethod,
DecryptionParams,
WakuMessage,
} from "../waku_message";

import { HistoryRPC, PageDirection } from "./history_rpc";

Expand Down Expand Up @@ -90,7 +95,7 @@ export interface QueryOptions {
* It can be Asymmetric Private Keys and Symmetric Keys in the same array,
* all keys will be tried with both methods.
*/
decryptionKeys?: Array<Uint8Array | string>;
decryptionParams?: DecryptionParams[];
}

/**
Expand Down Expand Up @@ -171,36 +176,30 @@ export class WakuStore {

Object.assign(opts, { storeCodec });
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections || !connections.length)
throw "Failed to get a connection to the peer";

const decryptionKeys = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
key,
method,
contentTopics,
};
}
);
const connection = selectConnection(connections);

if (!connection) throw "Failed to get a connection to the peer";

let decryptionParams: DecryptionParams[] = [];

this.decryptionKeys.forEach(({ method, contentTopics }, key) => {
decryptionParams.push({
key,
method,
contentTopics,
});
});

// Add the decryption keys passed to this function against the
// content topics also passed to this function.
if (opts.decryptionKeys) {
opts.decryptionKeys.forEach((key) => {
decryptionKeys.push({
key: hexToBytes(key),
contentTopics: contentTopics.length ? contentTopics : undefined,
method: undefined,
});
});
if (opts.decryptionParams) {
decryptionParams = decryptionParams.concat(opts.decryptionParams);
}

const messages: WakuMessage[] = [];
let cursor = undefined;
while (true) {
// TODO: Some connection selection logic?
const stream = await connections[0].newStream(storeCodec);
const stream = await connection.newStream(storeCodec);
const queryOpts = Object.assign(opts, { cursor });
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
dbg("Querying store peer", connections[0].remoteAddr.toString());
Expand Down Expand Up @@ -244,7 +243,7 @@ export class WakuStore {
const pageMessages: WakuMessage[] = [];
await Promise.all(
response.messages.map(async (protoMsg) => {
const msg = await WakuMessage.decodeProto(protoMsg, decryptionKeys);
const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams);

if (msg) {
messages.push(msg);
Expand Down