Skip to content

Commit

Permalink
feat: Logger with log levels (#1672)
Browse files Browse the repository at this point in the history
* setup a custom Logger with log level support

* refactor codebase for to use new Logger with log levels

* disallow usage of `debug` directly / only allow usage in/through custom Logger

* remove `debug` from logger
  • Loading branch information
danisharora099 committed Oct 20, 2023
1 parent 7b1b3c5 commit 9ed7fa8
Show file tree
Hide file tree
Showing 37 changed files with 306 additions and 216 deletions.
9 changes: 9 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
],
"globals": { "BigInt": true, "console": true, "WebAssembly": true },
"rules": {
"no-restricted-imports": [
"error",
{
"paths": [{
"name": "debug",
"message": "The usage of 'debug' package directly is disallowed. Please use the custom logger from @waku/utils instead."
}]
}
],
"prettier/prettier": [
"error",
{
Expand Down
50 changes: 28 additions & 22 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import {
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import { shardInfoToPubSubTopics } from "@waku/utils";
import debug from "debug";
import { Logger } from "@waku/utils";

import { KeepAliveManager } from "./keep_alive_manager.js";

const log = debug("waku:connection-manager");
const log = new Logger("connection-manager");

export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
Expand Down Expand Up @@ -128,14 +128,16 @@ export class ConnectionManager
this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay);

this.run()
.then(() => log(`Connection Manager is now running`))
.catch((error) => log(`Unexpected error while running service`, error));
.then(() => log.info(`Connection Manager is now running`))
.catch((error) =>
log.error(`Unexpected error while running service`, error)
);

// libp2p emits `peer:discovery` events during its initialization
// which means that before the ConnectionManager is initialized, some peers may have been discovered
// we will dial the peers in peerStore ONCE before we start to listen to the `peer:discovery` events within the ConnectionManager
this.dialPeerStorePeers().catch((error) =>
log(`Unexpected error while dialing peer store peers`, error)
log.error(`Unexpected error while dialing peer store peers`, error)
);
}

Expand All @@ -153,7 +155,7 @@ export class ConnectionManager
try {
await Promise.all(dialPromises);
} catch (error) {
log(`Unexpected error while dialing peer store peers`, error);
log.error(`Unexpected error while dialing peer store peers`, error);
}
}

Expand Down Expand Up @@ -185,7 +187,9 @@ export class ConnectionManager
let dialAttempt = 0;
while (dialAttempt < this.options.maxDialAttemptsForPeer) {
try {
log(`Dialing peer ${peerId.toString()} on attempt ${dialAttempt + 1}`);
log.info(
`Dialing peer ${peerId.toString()} on attempt ${dialAttempt + 1}`
);
await this.libp2p.dial(peerId);

const tags = await this.getTagNamesForPeer(peerId);
Expand All @@ -204,10 +208,12 @@ export class ConnectionManager
} catch (error) {
if (error instanceof AggregateError) {
// Handle AggregateError
log(`Error dialing peer ${peerId.toString()} - ${error.errors}`);
log.error(
`Error dialing peer ${peerId.toString()} - ${error.errors}`
);
} else {
// Handle generic error
log(
log.error(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message

Check warning on line 218 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}`
Expand All @@ -233,18 +239,18 @@ export class ConnectionManager
let errorMessage;
if (error instanceof AggregateError) {
if (!error.errors) {
log(`No errors array found for AggregateError`);
log.warn(`No errors array found for AggregateError`);
} else if (error.errors.length === 0) {
log(`Errors array is empty for AggregateError`);
log.warn(`Errors array is empty for AggregateError`);
} else {
errorMessage = JSON.stringify(error.errors[0]);
}
} else {
errorMessage = error.message;
}

log(
`Deleting undialable peer ${peerId.toString()} from peer store. Error: ${errorMessage}`
log.info(
`Deleting undialable peer ${peerId.toString()} from peer store. Reason: ${errorMessage}`
);
}

Expand All @@ -262,9 +268,9 @@ export class ConnectionManager
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);
log(`Dropped connection with peer ${peerId.toString()}`);
log.info(`Dropped connection with peer ${peerId.toString()}`);
} catch (error) {
log(
log.error(
`Error dropping connection with peer ${peerId.toString()} - ${error}`
);
}
Expand All @@ -278,7 +284,7 @@ export class ConnectionManager
const peerId = this.pendingPeerDialQueue.shift();
if (!peerId) return;
this.attemptDial(peerId).catch((error) => {
log(error);
log.error(error);
});
}
}
Expand Down Expand Up @@ -325,7 +331,7 @@ export class ConnectionManager
}

this.dialPeer(peerId).catch((err) => {
log(`Error dialing peer ${peerId.toString()} : ${err}`);
log.error(`Error dialing peer ${peerId.toString()} : ${err}`);
});
}

Expand All @@ -339,7 +345,7 @@ export class ConnectionManager
try {
await this.attemptDial(peerId);
} catch (error) {
log(`Error dialing peer ${peerId.toString()} : ${error}`);
log.error(`Error dialing peer ${peerId.toString()} : ${error}`);
}
})();
},
Expand Down Expand Up @@ -408,7 +414,7 @@ export class ConnectionManager
// if we're already connected to the peer, don't dial
const isConnected = this.libp2p.getConnections(peerId).length > 0;
if (isConnected) {
log(`Already connected to peer ${peerId.toString()}. Not dialing.`);
log.warn(`Already connected to peer ${peerId.toString()}. Not dialing.`);
return false;
}

Expand All @@ -418,7 +424,7 @@ export class ConnectionManager
peerId,
this.libp2p.peerStore
);
log(
log.warn(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubSubTopics
}).
Expand All @@ -429,7 +435,7 @@ export class ConnectionManager

// if the peer is not dialable based on bootstrap status, don't dial
if (!(await this.isPeerDialableBasedOnBootstrapStatus(peerId))) {
log(
log.warn(
`Peer ${peerId.toString()} is not dialable based on bootstrap status. Not dialing.`
);
return false;
Expand Down Expand Up @@ -498,7 +504,7 @@ export class ConnectionManager
const peer = await this.libp2p.peerStore.get(peerId);
return Array.from(peer.tags.keys());
} catch (error) {
log(`Failed to get peer ${peerId}, error: ${error}`);
log.error(`Failed to get peer ${peerId}, error: ${error}`);
return [];
}
}
Expand Down
35 changes: 18 additions & 17 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
groupByContentTopic,
toAsyncIterator
} from "@waku/utils";
import debug from "debug";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
Expand All @@ -36,7 +36,7 @@ import {
FilterSubscribeRpc
} from "./filter_rpc.js";

const log = debug("waku:filter:v2");
const log = new Logger("filter:v2");

type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
Expand Down Expand Up @@ -108,7 +108,7 @@ class Subscription {
);
}

log(
log.info(
"Subscribed to peer ",
this.peer.id.toString(),
"for content topics",
Expand Down Expand Up @@ -183,9 +183,9 @@ class Subscription {
);
}

log("Ping successful");
log.info("Ping successful");
} catch (error) {
log("Error pinging: ", error);
log.error("Error pinging: ", error);
throw new Error("Error pinging: " + error);
}
}
Expand Down Expand Up @@ -216,7 +216,7 @@ class Subscription {
}

this.subscriptionCallbacks.clear();
log("Unsubscribed from all content topics");
log.info("Unsubscribed from all content topics");
} catch (error) {
throw new Error("Error unsubscribing from all content topics: " + error);
}
Expand All @@ -226,7 +226,7 @@ class Subscription {
const contentTopic = message.contentTopic;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
if (!subscriptionCallback) {
log("No subscription callback available for ", contentTopic);
log.error("No subscription callback available for ", contentTopic);
return;
}
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
Expand Down Expand Up @@ -260,7 +260,7 @@ class Filter extends BaseProtocol implements IReceiver {
this.pubsubTopics = options?.pubsubTopics || [DefaultPubSubTopic];

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterCodecs.PUSH, e);
log.error("Failed to register ", FilterCodecs.PUSH, e);
});

this.activeSubscriptions = new Map();
Expand Down Expand Up @@ -332,7 +332,6 @@ class Filter extends BaseProtocol implements IReceiver {
}

private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
pipe(streamData.stream, lp.decode, async (source) => {
for await (const bytes of source) {
Expand All @@ -341,12 +340,12 @@ class Filter extends BaseProtocol implements IReceiver {
const { pubsubTopic, wakuMessage } = response;

if (!wakuMessage) {
log("Received empty message");
log.error("Received empty message");
return;
}

if (!pubsubTopic) {
log("PubSub topic missing from push message");
log.error("PubSub topic missing from push message");
return;
}

Expand All @@ -357,22 +356,24 @@ class Filter extends BaseProtocol implements IReceiver {
);

if (!subscription) {
log(`No subscription locally registered for topic ${pubsubTopic}`);
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}

await subscription.processMessage(wakuMessage);
}
}).then(
() => {
log("Receiving pipe closed.");
log.info("Receiving pipe closed.");
},
(e) => {
log("Error with receiving pipe", e);
log.error("Error with receiving pipe", e);
}
);
} catch (e) {
log("Error decoding message", e);
log.error("Error decoding message", e);
}
}
}
Expand All @@ -392,7 +393,7 @@ async function pushMessage<T extends IDecodedMessage>(

const { contentTopic } = message;
if (!contentTopic) {
log("Message has no content topic, skipping");
log.warn("Message has no content topic, skipping");
return;
}

Expand All @@ -407,6 +408,6 @@ async function pushMessage<T extends IDecodedMessage>(

await callback(decodedMessage);
} catch (e) {
log("Error decoding message", e);
log.error("Error decoding message", e);
}
}
16 changes: 8 additions & 8 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { IRelay, PeerIdStr } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug";
import type { PingService } from "libp2p/ping";

import { createEncoder } from "./message/version_0.js";

export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = debug("waku:keep-alive");
const log = new Logger("keep-alive");

export class KeepAliveManager {
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
Expand Down Expand Up @@ -48,9 +48,9 @@ export class KeepAliveManager {
// also update the peer store with the latency
try {
ping = await libp2pPing.ping(peerId);
log(`Ping succeeded (${peerIdStr})`, ping);
log.info(`Ping succeeded (${peerIdStr})`, ping);
} catch (error) {
log(`Ping failed for peer (${peerIdStr}).
log.error(`Ping failed for peer (${peerIdStr}).
Next ping will be attempted in ${pingPeriodSecs} seconds.
`);
return;
Expand All @@ -63,10 +63,10 @@ export class KeepAliveManager {
}
});
} catch (e) {
log("Failed to update ping", e);
log.error("Failed to update ping", e);
}
} catch (e) {
log(`Ping failed (${peerIdStr})`, e);
log.error(`Ping failed (${peerIdStr})`, e);
}
})();
}, pingPeriodSecs * 1000);
Expand Down Expand Up @@ -128,10 +128,10 @@ export class KeepAliveManager {
ephemeral: true
});
const interval = setInterval(() => {
log("Sending Waku Relay ping message");
log.info("Sending Waku Relay ping message");
relay
.send(encoder, { payload: new Uint8Array([1]) })
.catch((e) => log("Failed to send relay ping", e));
.catch((e) => log.error("Failed to send relay ping", e));
}, relayPeriodSecs * 1000);
intervals.push(interval);
}
Expand Down
Loading

0 comments on commit 9ed7fa8

Please sign in to comment.