Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 1MB restriction to LightPush and Relay #1351

Merged
merged 14 commits into from
May 17, 2023
362 changes: 182 additions & 180 deletions package-lock.json

Large diffs are not rendered by default.

35 changes: 26 additions & 9 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type {
import {
IEncoder,
ILightPush,
IMessage,
ProtocolCreateOptions,
ProtocolOptions,
SendError,
SendResult,
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isSizeValid } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
Expand Down Expand Up @@ -47,12 +49,24 @@ class LightPush extends BaseProtocol implements ILightPush {
const stream = await this.newStream(peer);

const recipients: PeerId[] = [];
let error: undefined | SendError = undefined;

try {
if (!isSizeValid(message.payload)) {
log("Failed to send waku light push: message is bigger that 1MB");
return {
recipients,
error: SendError.SIZE_TOO_BIG,
};
}

const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
return { recipients };
return {
recipients,
error: SendError.ENCODE_FAILED,
};
}
const query = PushRpc.createRequest(protoMessage, pubSubTopic);
const res = await pipe(
Expand All @@ -70,21 +84,24 @@ class LightPush extends BaseProtocol implements ILightPush {

const response = PushRpc.decode(bytes).response;

if (!response) {
log("No response in PushRPC");
return { recipients };
}

if (response.isSuccess) {
if (response?.isSuccess) {
recipients.push(peer.id);
} else {
log("No response in PushRPC");
error = SendError.NO_RPC_RESPONSE;
}
} catch (err) {
log("Failed to decode push reply", err);
error = SendError.DECODE_FAILED;
}
} catch (err) {
log("Failed to send waku light push request", err);
error = SendError.GENERIC_FAIL;
}
return { recipients };
return {
error,
recipients,
};
}
}

Expand Down
9 changes: 9 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ export type Callback<T extends IDecodedMessage> = (
msg: T
) => void | Promise<void>;

export enum SendError {
GENERIC_FAIL = "Generic error",
ENCODE_FAILED = "Failed to encode",
DECODE_FAILED = "Failed to decode",
SIZE_TOO_BIG = "Size is too big",
NO_RPC_RESPONSE = "No RPC response",
}

export interface SendResult {
error?: SendError;
recipients: PeerId[];
}
18 changes: 15 additions & 3 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PubSub } from "@libp2p/interface-pubsub";
import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubSubTopic } from "@waku/core";
import type {
import {
ActiveSubscriptions,
Callback,
IAsyncIterator,
Expand All @@ -21,9 +21,10 @@ import type {
IRelay,
ProtocolCreateOptions,
ProtocolOptions,
SendError,
SendResult,
} from "@waku/interfaces";
import { groupByContentTopic, toAsyncIterator } from "@waku/utils";
import { groupByContentTopic, isSizeValid, toAsyncIterator } from "@waku/utils";
import debug from "debug";

import { RelayCodecs } from "./constants.js";
Expand Down Expand Up @@ -97,10 +98,21 @@ class Relay implements IRelay {
* Send Waku message.
*/
public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
if (!isSizeValid(message.payload)) {
log("Failed to send waku relay: message is bigger that 1MB");
return {
recipients: [],
error: SendError.SIZE_TOO_BIG,
};
}

const msg = await encoder.toWire(message);
if (!msg) {
log("Failed to encode message, aborting publish");
return { recipients: [] };
return {
recipients: [],
error: SendError.ENCODE_FAILED,
};
}

return this.gossipSub.publish(this.pubSubTopic, msg);
Expand Down
1 change: 1 addition & 0 deletions packages/tests/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ export * from "./constants.js";
export * from "./delay.js";
export * from "./log_file.js";
export * from "./nwaku.js";
export * from "./random_array.js";
15 changes: 15 additions & 0 deletions packages/tests/src/random_array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import crypto from "crypto";

export function generateRandomUint8Array(sizeInBytes: number): Uint8Array {
const chunkSize = 65536; // Maximum entropy available
const chunks = Math.ceil(sizeInBytes / chunkSize);
const buffer = new Uint8Array(sizeInBytes);

for (let i = 0; i < chunks; i++) {
const chunk = new Uint8Array(chunkSize);
crypto.getRandomValues(chunk);
buffer.set(chunk, i * chunkSize);
}

return buffer;
}
83 changes: 57 additions & 26 deletions packages/tests/tests/light_push.node.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createEncoder, waitForRemotePeer } from "@waku/core";
import { createLightNode } from "@waku/create";
import type { LightNode } from "@waku/interfaces";
import { LightNode, SendError } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
Expand All @@ -9,6 +9,7 @@ import debug from "debug";
import {
base64ToUtf8,
delay,
generateRandomUint8Array,
makeLogFileName,
MessageRpcResponse,
NOISE_KEY_1,
Expand All @@ -26,24 +27,43 @@ describe("Waku Light Push [node only]", () => {
let waku: LightNode;
let nwaku: Nwaku;

afterEach(async function () {
!!nwaku &&
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});

it("Push successfully", async function () {
this.timeout(15_000);

nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ lightpush: true, relay: true });
const runNodes = async (
context: Mocha.Context,
pubSubTopic?: string
): Promise<void> => {
const nwakuOptional = pubSubTopic ? { topics: pubSubTopic } : {};
nwaku = new Nwaku(makeLogFileName(context));
await nwaku.start({
lightpush: true,
relay: true,
...nwakuOptional,
});

waku = await createLightNode({
pubSubTopic,
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
};

beforeEach(async function () {
this.timeout(15_000);
await runNodes(this);
});

afterEach(async function () {
try {
nwaku?.stop();
waku?.stop();
} catch (e) {
console.error("Failed to stop nodes: ", e);
}
});

it("Push successfully", async function () {
this.timeout(15_000);

const messageText = "Light Push works!";

Expand All @@ -63,28 +83,39 @@ describe("Waku Light Push [node only]", () => {
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
});

it("Push on custom pubsub topic", async function () {
it("Pushes messages equal or less that 1MB", async function () {
this.timeout(15_000);
const MB = 1024 ** 2;

const customPubSubTopic = "/waku/2/custom-dapp/proto";
let pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB),
});
expect(pushResponse.recipients.length).to.greaterThan(0);

nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
lightpush: true,
topics: customPubSubTopic,
relay: true,
pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(65536),
});
expect(pushResponse.recipients.length).to.greaterThan(0);
});

waku = await createLightNode({
pubSubTopic: customPubSubTopic,
staticNoiseKey: NOISE_KEY_1,
it("Fails to push message bigger that 1MB", async function () {
this.timeout(15_000);
const MB = 1024 ** 2;

const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB + 65536),
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.error).to.eq(SendError.SIZE_TOO_BIG);
});

const nimPeerId = await nwaku.getPeerId();
it("Push on custom pubsub topic", async function () {
this.timeout(15_000);

const customPubSubTopic = "/waku/2/custom-dapp/proto";
await runNodes(this, customPubSubTopic);

const nimPeerId = await nwaku.getPeerId();
const messageText = "Light Push works!";

log("Send message via lightpush");
Expand Down
64 changes: 63 additions & 1 deletion packages/tests/tests/relay.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
waitForRemotePeer,
} from "@waku/core";
import { createRelayNode } from "@waku/create";
import type { RelayNode } from "@waku/interfaces";
import { RelayNode, SendError } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import {
createDecoder as createEciesDecoder,
Expand All @@ -27,6 +27,7 @@ import debug from "debug";
import {
base64ToUtf8,
delay,
generateRandomUint8Array,
makeLogFileName,
MessageRpcResponse,
NOISE_KEY_1,
Expand Down Expand Up @@ -336,6 +337,67 @@ describe("Waku Relay [node only]", () => {
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
expect(waku2ReceivedMsg.pubSubTopic).to.eq(pubSubTopic);
});

it("Publishes <= 1 MB and rejects others", async function () {
this.timeout(10000);
const MB = 1024 ** 2;

const pubSubTopic = "/some/pubsub/topic";

// 1 and 2 uses a custom pubsub
[waku1, waku2] = await Promise.all([
createRelayNode({
pubSubTopic: pubSubTopic,
staticNoiseKey: NOISE_KEY_1,
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubSubTopic: pubSubTopic,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
}).then((waku) => waku.start().then(() => waku)),
]);

await waku1.libp2p.peerStore.addressBook.set(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
await Promise.all([waku1.dial(waku2.libp2p.peerId)]);

await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
]);

const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
waku2.relay.subscribe([TestDecoder], () =>
resolve({
payload: new Uint8Array([]),
} as DecodedMessage)
);
}
);

let sendResult = await waku1.relay.send(TestEncoder, {
payload: generateRandomUint8Array(1 * MB),
});
expect(sendResult.recipients.length).to.eq(1);

sendResult = await waku1.relay.send(TestEncoder, {
payload: generateRandomUint8Array(1 * MB + 65536),
});
expect(sendResult.recipients.length).to.eq(0);
expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG);

sendResult = await waku1.relay.send(TestEncoder, {
payload: generateRandomUint8Array(2 * MB),
});
expect(sendResult.recipients.length).to.eq(0);
expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG);

const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
expect(waku2ReceivedMsg?.payload?.length).to.eq(0);
});
});

describe("Interop: nwaku", function () {
Expand Down
1 change: 1 addition & 0 deletions packages/utils/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from "./is_defined.js";
export * from "./random_subset.js";
export * from "./group_by.js";
export * from "./to_async_iterator.js";
export * from "./is_size_valid.js";
10 changes: 10 additions & 0 deletions packages/utils/src/common/is_size_valid.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const MB = 1024 ** 2;
const SIZE_CAP = 1; // 1 MB

export const isSizeValid = (payload: Uint8Array): boolean => {
if (payload.length / MB > SIZE_CAP) {
return false;
}

return true;
};