Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 1MB restriction to LightPush and Relay #1351

Merged
merged 14 commits into from
May 17, 2023
362 changes: 182 additions & 180 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
SendResult,
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isSizeValid } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
Expand Down Expand Up @@ -49,6 +50,11 @@ class LightPush extends BaseProtocol implements ILightPush {
const recipients: PeerId[] = [];

try {
if (!isSizeValid(message.payload)) {
log("Failed to send waku light push: message is bigger that 1MB");
return { recipients };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to start return errors in this result?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. I'd still rather not throw, as hybrid let's put error codes there.

}

const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
Expand Down
11 changes: 9 additions & 2 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type {
ProtocolOptions,
SendResult,
} from "@waku/interfaces";
import { groupByContentTopic, toAsyncIterator } from "@waku/utils";
import { groupByContentTopic, isSizeValid, toAsyncIterator } from "@waku/utils";
import debug from "debug";

import { RelayCodecs } from "./constants.js";
Expand Down Expand Up @@ -97,10 +97,17 @@ class Relay implements IRelay {
* Send Waku message.
*/
public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
const emptySendResult = { recipients: [] };

if (!isSizeValid(message.payload)) {
log("Failed to send waku relay: message is bigger that 1MB");
return emptySendResult;
}

const msg = await encoder.toWire(message);
if (!msg) {
log("Failed to encode message, aborting publish");
return { recipients: [] };
return emptySendResult;
}

return this.gossipSub.publish(this.pubSubTopic, msg);
Expand Down
1 change: 1 addition & 0 deletions packages/tests/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ export * from "./constants.js";
export * from "./delay.js";
export * from "./log_file.js";
export * from "./nwaku.js";
export * from "./random_array.js";
15 changes: 15 additions & 0 deletions packages/tests/src/random_array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import crypto from "crypto";

export function generateRandomUint8Array(sizeInBytes: number): Uint8Array {
const chunkSize = 65536; // Maximum entropy available
const chunks = Math.ceil(sizeInBytes / chunkSize);
const buffer = new Uint8Array(sizeInBytes);

for (let i = 0; i < chunks; i++) {
const chunk = new Uint8Array(chunkSize);
crypto.getRandomValues(chunk);
buffer.set(chunk, i * chunkSize);
}

return buffer;
}
80 changes: 55 additions & 25 deletions packages/tests/tests/light_push.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import debug from "debug";
import {
base64ToUtf8,
delay,
generateRandomUint8Array,
makeLogFileName,
MessageRpcResponse,
NOISE_KEY_1,
Expand All @@ -26,24 +27,43 @@ describe("Waku Light Push [node only]", () => {
let waku: LightNode;
let nwaku: Nwaku;

afterEach(async function () {
!!nwaku &&
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});

it("Push successfully", async function () {
this.timeout(15_000);

nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ lightpush: true, relay: true });
const runNodes = async (
context: Mocha.Context,
pubSubTopic?: string
): Promise<void> => {
const nwakuOptional = pubSubTopic ? { topics: pubSubTopic } : {};
nwaku = new Nwaku(makeLogFileName(context));
await nwaku.start({
lightpush: true,
relay: true,
...nwakuOptional,
});

waku = await createLightNode({
pubSubTopic,
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
};

beforeEach(async function () {
this.timeout(15_000);
await runNodes(this);
});

afterEach(async function () {
try {
nwaku?.stop();
waku?.stop();
} catch (e) {
console.error("Failed to stop nodes: ", e);
}
});

it("Push successfully", async function () {
this.timeout(15_000);

const messageText = "Light Push works!";

Expand All @@ -63,28 +83,38 @@ describe("Waku Light Push [node only]", () => {
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
});

it("Push on custom pubsub topic", async function () {
it("Pushes messages equal or less that 1MB", async function () {
this.timeout(15_000);
const MB = 1024 ** 2;

const customPubSubTopic = "/waku/2/custom-dapp/proto";
let pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB),
});
expect(pushResponse.recipients.length).to.greaterThan(0);

nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
lightpush: true,
topics: customPubSubTopic,
relay: true,
pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(65536),
});
expect(pushResponse.recipients.length).to.greaterThan(0);
});

waku = await createLightNode({
pubSubTopic: customPubSubTopic,
staticNoiseKey: NOISE_KEY_1,
it("Fails to push message bigger that 1MB", async function () {
this.timeout(15_000);
const MB = 1024 ** 2;

const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB + 65536),
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
expect(pushResponse.recipients.length).to.eq(0);
});

const nimPeerId = await nwaku.getPeerId();
it("Push on custom pubsub topic", async function () {
this.timeout(15_000);

const customPubSubTopic = "/waku/2/custom-dapp/proto";
await runNodes(this, customPubSubTopic);

const nimPeerId = await nwaku.getPeerId();
const messageText = "Light Push works!";

log("Send message via lightpush");
Expand Down
60 changes: 60 additions & 0 deletions packages/tests/tests/relay.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import debug from "debug";
import {
base64ToUtf8,
delay,
generateRandomUint8Array,
makeLogFileName,
MessageRpcResponse,
NOISE_KEY_1,
Expand Down Expand Up @@ -336,6 +337,65 @@ describe("Waku Relay [node only]", () => {
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
expect(waku2ReceivedMsg.pubSubTopic).to.eq(pubSubTopic);
});

it("Publishes <= 1 MB and rejects others", async function () {
this.timeout(10000);
const MB = 1024 ** 2;

const pubSubTopic = "/some/pubsub/topic";

// 1 and 2 uses a custom pubsub
[waku1, waku2] = await Promise.all([
createRelayNode({
pubSubTopic: pubSubTopic,
staticNoiseKey: NOISE_KEY_1,
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubSubTopic: pubSubTopic,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
}).then((waku) => waku.start().then(() => waku)),
]);

await waku1.libp2p.peerStore.addressBook.set(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
await Promise.all([waku1.dial(waku2.libp2p.peerId)]);

await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
]);

const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
waku2.relay.subscribe([TestDecoder], () =>
resolve({
payload: new Uint8Array([]),
} as DecodedMessage)
);
}
);

let sendResult = await waku1.relay.send(TestEncoder, {
payload: generateRandomUint8Array(1 * MB),
});
expect(sendResult.recipients.length).to.eq(1);

sendResult = await waku1.relay.send(TestEncoder, {
payload: generateRandomUint8Array(1 * MB + 65536),
});
expect(sendResult.recipients.length).to.eq(0);

sendResult = await waku1.relay.send(TestEncoder, {
payload: generateRandomUint8Array(2 * MB),
});
expect(sendResult.recipients.length).to.eq(0);

const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
expect(waku2ReceivedMsg?.payload?.length).to.eq(0);
});
});

describe("Interop: nwaku", function () {
Expand Down
1 change: 1 addition & 0 deletions packages/utils/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from "./is_defined.js";
export * from "./random_subset.js";
export * from "./group_by.js";
export * from "./to_async_iterator.js";
export * from "./is_size_valid.js";
10 changes: 10 additions & 0 deletions packages/utils/src/common/is_size_valid.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const MB = 1024 ** 2;
const SIZE_CAP = 1; // 1 MB

export const isSizeValid = (payload: Uint8Array): boolean => {
if (payload.length / MB > SIZE_CAP) {
return false;
}

return true;
};