Skip to content

Commit

Permalink
chore: store uses the node
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Oct 24, 2024
1 parent 728b3c5 commit b6339f7
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
18 changes: 17 additions & 1 deletion packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Peer, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
PeerIdStr,
PubsubTopic
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
Expand Down Expand Up @@ -75,15 +76,30 @@ export class BaseProtocol implements IBaseProtocolCore {
public async getPeers(
{
numPeers,
maxBootstrapPeers
maxBootstrapPeers,
peerIdStr
}: {
numPeers: number;
maxBootstrapPeers: number;
peerIdStr?: PeerIdStr;
} = {
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
if (peerIdStr) {
const peer = (await this.connectedPeers()).find(
(p) => p.id.toString() === peerIdStr
);
if (peer) {
return [peer];
}
this.log.warn(
`Passed node to use for ${this.multicodec} not found: ${peerIdStr}. Attempting to use random peers.`
);
return this.getPeers({ numPeers, maxBootstrapPeers });
}

// Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers();

Expand Down
6 changes: 2 additions & 4 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ export type ProtocolCreateOptions = {
* List of nodes' multiaddrs as strings to use for each protocol. If not specified, random nodes will be used.
* This should be used only if you know what you are doing.
*/
nodesToUse?: {
store?: string[];
filter?: string[];
lightpush?: string[];
nodeToUse?: {
store?: string;
};
};

Expand Down
15 changes: 11 additions & 4 deletions packages/sdk/src/protocols/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ const log = new Logger("waku:store:sdk");
export class Store extends BaseProtocolSDK implements IStore {
public readonly protocol: StoreCore;

public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) {
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private readonly peerIdStrToUse?: string
) {
super(
new StoreCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager,
Expand Down Expand Up @@ -61,9 +65,11 @@ export class Store extends BaseProtocolSDK implements IStore {
const peer = (
await this.protocol.getPeers({
numPeers: this.numPeersToUse,
maxBootstrapPeers: 1
maxBootstrapPeers: 1,
peerIdStr: this.peerIdStrToUse
})
)[0];

if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");
Expand Down Expand Up @@ -237,9 +243,10 @@ export class Store extends BaseProtocolSDK implements IStore {
* @returns A function that takes a Libp2p instance and returns a StoreSDK instance.
*/
export function wakuStore(
connectionManager: ConnectionManager
connectionManager: ConnectionManager,
peerIdStrToUse?: string
): (libp2p: Libp2p) => IStore {
return (libp2p: Libp2p) => {
return new Store(connectionManager, libp2p);
return new Store(connectionManager, libp2p, peerIdStrToUse);
};
}
23 changes: 13 additions & 10 deletions packages/sdk/src/waku/waku.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core";
import type {
IFilter,
IHealthManager,
Expand All @@ -10,6 +10,7 @@ import type {
IStore,
IWaku,
Libp2p,
PeerIdStr,
ProtocolCreateOptions,
PubsubTopic
} from "@waku/interfaces";
Expand Down Expand Up @@ -106,17 +107,15 @@ export class WakuNode implements IWaku {
this.health = getHealthManager();

if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager);
this.store = store(libp2p);

if (options.nodesToUse?.store) {
this.dialMultiaddr(
options.nodesToUse.store[0],
this.store.protocol.multicodec
).catch((e) => {
let peerIdStr: PeerIdStr | undefined;
if (options.nodeToUse?.store) {
this.dialMultiaddr(options.nodeToUse.store, StoreCodec).catch((e) => {
log.error("Failed to dial store peer", e);
});
}

const store = wakuStore(this.connectionManager, peerIdStr);
this.store = store(libp2p);
}

if (protocolsEnabled.lightpush) {
Expand Down Expand Up @@ -236,9 +235,13 @@ export class WakuNode implements IWaku {
private async dialMultiaddr(
multiaddrStr: string,
protocol: string
): Promise<void> {
): Promise<PeerIdStr> {
const ma = multiaddr(multiaddrStr);
if (!ma.getPeerId()) {
throw new Error("Failed to dial multiaddr: missing peer ID");
}
await this.libp2p.dialProtocol(ma, [protocol]);
return ma.getPeerId()!;
}

private mapToPeerIdOrMultiaddr(
Expand Down

0 comments on commit b6339f7

Please sign in to comment.