Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

fix: update interfaces #140

Merged
merged 2 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
"extends": "ipfs",
"parserOptions": {
"sourceType": "module"
}
},
"ignorePatterns": [
"*.d.ts"
]
},
"release": {
"branches": [
Expand Down Expand Up @@ -126,7 +129,11 @@
"scripts": {
"lint": "aegir lint",
"dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js",
"build": "tsc",
"build": "tsc && npm run build:copy-proto-files",
"build:copy-proto-files": "mkdirp dist/src/message && cp src/message/*.js src/message/*.d.ts dist/src/message",
"generate": "npm run generate:proto && npm run generate:proto-types",
"generate:proto": "pbjs -t static-module -w es6 -r libp2p-floodsub --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/message/rpc.js ./src/message/rpc.proto",
"generate:proto-types": "pbts -o src/message/rpc.d.ts src/message/rpc.js",
"pretest": "npm run build",
"test": "aegir test -f dist/test",
"test:chrome": "npm run test -- -t browser --cov",
Expand All @@ -138,17 +145,18 @@
"release": "semantic-release"
},
"dependencies": {
"@libp2p/interfaces": "^1.3.6",
"@libp2p/logger": "^1.0.3",
"@libp2p/pubsub": "^1.2.4",
"@libp2p/interfaces": "^1.3.14",
"@libp2p/logger": "^1.1.2",
"@libp2p/pubsub": "^1.2.10",
"protobufjs": "^6.11.2",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.0.8",
"@libp2p/peer-id": "^1.1.3",
"@libp2p/peer-id-factory": "^1.0.5",
"@multiformats/multiaddr": "^10.1.5",
"aegir": "^36.1.1",
"@libp2p/interface-compliance-tests": "^1.1.16",
"@libp2p/peer-id": "^1.1.8",
"@libp2p/peer-id-factory": "^1.0.8",
"@multiformats/multiaddr": "^10.1.7",
"aegir": "^36.1.3",
"multiformats": "^9.4.5",
"p-wait-for": "^4.1.0",
"sinon": "^13.0.1",
Expand Down
51 changes: 37 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { toString } from 'uint8arrays/to-string'
import { PubsubBaseProtocol } from '@libp2p/pubsub'
import { PubSubBaseProtocol } from '@libp2p/pubsub'
import { multicodec } from './config.js'
import { SimpleTimeCache } from './cache.js'
import type { PubSub, PubSubEvents, PubSubOptions, Message } from '@libp2p/interfaces/pubsub'
import type { PubSub, PubSubEvents, PubSubInit, Message, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import { logger } from '@libp2p/logger'
import { RPC } from './message/rpc.js'

const debugName = 'libp2p:floodsub'
const log = logger('libp2p:floodsub')

export { multicodec }

export interface FloodSubOptions extends PubSubOptions {
export interface FloodSubInit extends PubSubInit {
seenTTL?: number
}

Expand All @@ -18,13 +20,12 @@ export interface FloodSubOptions extends PubSubOptions {
* delivering an API for Publish/Subscribe, but with no CastTree Forming
* (it just floods the network).
*/
export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends PubsubBaseProtocol<EventMap> implements PubSub<EventMap & PubSubEvents> {
export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends PubSubBaseProtocol<EventMap> implements PubSub<EventMap & PubSubEvents> {
public seenCache: SimpleTimeCache<boolean>

constructor (options: FloodSubOptions) {
constructor (init?: FloodSubInit) {
super({
...options,
debugName: debugName,
...init,
canRelayMessage: true,
multicodecs: [multicodec]
})
Expand All @@ -35,10 +36,32 @@ export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends Pub
* @type {TimeCache}
*/
this.seenCache = new SimpleTimeCache<boolean>({
validityMs: options.seenTTL ?? 30000
validityMs: init?.seenTTL ?? 30000
})
}

/**
* Decode a Uint8Array into an RPC object
*/
decodeRpc (bytes: Uint8Array): PubSubRPC {
return RPC.decode(bytes)
}

/**
* Encode an RPC object into a Uint8Array
*/
encodeRpc (rpc: PubSubRPC): Uint8Array {
return RPC.encode(rpc).finish()
}

decodeMessage (bytes: Uint8Array): PubSubRPCMessage {
return RPC.Message.decode(bytes)
}

encodeMessage (rpc: PubSubRPCMessage): Uint8Array {
return RPC.Message.encode(rpc).finish()
}

/**
* Process incoming message
* Extends base implementation to check router cache.
Expand All @@ -64,22 +87,22 @@ export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends Pub
const peers = this.getSubscribers(message.topic)

if (peers == null || peers.length === 0) {
this.log('no peers are subscribed to topic %s', message.topic)
log('no peers are subscribed to topic %s', message.topic)
return
}

peers.forEach(id => {
if (this.peerId.equals(id)) {
this.log('not sending message on topic %s to myself', message.topic)
if (this.components.getPeerId().equals(id)) {
log('not sending message on topic %s to myself', message.topic)
return
}

if (id.equals(from)) {
this.log('not sending message on topic %s to sender %p', message.topic, id)
log('not sending message on topic %s to sender %p', message.topic, id)
return
}

this.log('publish msgs on topics %s %p', message.topic, id)
log('publish msgs on topics %s %p', message.topic, id)

this.send(id, { messages: [message] })
})
Expand Down
Loading