Skip to content

Commit

Permalink
feat: add naive request-response for js (libp2p#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee authored Jun 18, 2023
1 parent 6a4eec4 commit 6e2598f
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 30 deletions.
32 changes: 24 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion packages/frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@
"debug": "^4.3.4",
"eslint": "8.35.0",
"eslint-config-next": "13.2.3",
"it-length-prefixed": "^9.0.1",
"it-map": "^3.0.3",
"it-pipe": "^3.0.1",
"libp2p": "^0.45.5",
"next": "13.2.3",
"private-ip": "^3.0.0",
"react": "18.2.0",
"react-dom": "18.2.0",
"typescript": "4.9.5",
"usehooks-ts": "^2.9.1"
"uint8arrays": "^4.0.4",
"usehooks-ts": "^2.9.1",
"uuid": "^9.0.0"
},
"devDependencies": {
"@types/uuid": "^9.0.2",
"autoprefixer": "^10.4.13",
"postcss": "^8.4.21",
"tailwindcss": "^3.2.7"
Expand Down
111 changes: 99 additions & 12 deletions packages/frontend/src/components/chat.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import { useLibp2pContext } from '@/context/ctx'
import React, { ChangeEvent, useCallback, useEffect, useRef, useState } from 'react'
import React, { useCallback, useEffect, useRef, useState } from 'react'
import { Message } from '@libp2p/interface-pubsub'
import { CHAT_TOPIC } from '@/lib/constants'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL } from '@/lib/constants'
import { createIcon } from '@download/blockies'
import { ChatMessage, useChatContext } from '../context/chat-ctx'

import { v4 as uuidv4 } from 'uuid';
import { ChatFile, useFileChatContext } from '@/context/file-ctx'
import { pipe } from 'it-pipe'
import map from 'it-map'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { multiaddr, Multiaddr } from '@multiformats/multiaddr'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import * as lp from 'it-length-prefixed'
import { peerIdFromString } from '@libp2p/peer-id'

interface MessageProps extends ChatMessage { }

Expand Down Expand Up @@ -45,30 +53,81 @@ function Message({ msg, from, peerId }: MessageProps) {
export default function ChatContainer() {
const { libp2p } = useLibp2pContext()
const { messageHistory, setMessageHistory } = useChatContext();
const { files, setFiles } = useFileChatContext();
const [input, setInput] = useState<string>('')
const fileRef = useRef<HTMLInputElement>(null);

// Effect hook to subscribe to pubsub events and update the message state hook
useEffect(() => {
const messageCB = (evt: CustomEvent<Message>) => {
const messageCB = async (evt: CustomEvent<Message>) => {
console.log('gossipsub console log', evt.detail)
// FIXME: Why does 'from' not exist on type 'Message'?
const { topic, data } = evt.detail
const msg = new TextDecoder().decode(data)
console.log(`${topic}: ${msg}`)

// Append signed messages, otherwise discard
if (evt.detail.type === 'signed') {
setMessageHistory([...messageHistory, { msg, from: 'other', peerId: evt.detail.from.toString() }])
if (topic === CHAT_TOPIC) {
const msg = new TextDecoder().decode(data)
console.log(`${topic}: ${msg}`)

// Append signed messages, otherwise discard
if (evt.detail.type === 'signed') {
setMessageHistory([...messageHistory, { msg, from: 'other', peerId: evt.detail.from.toString() }])
}
} else if (topic === CHAT_FILE_TOPIC) {
const { fileId, provider } = JSON.parse(new TextDecoder().decode(data))
console.log(`fileId:${fileId}, provider:${provider}`)

const stream = await libp2p.dialProtocol(peerIdFromString(provider), FILE_EXCHANGE_PROTOCOL)
await pipe(
[uint8ArrayFromString(fileId)],
(source) => lp.encode(source),
stream,
(source) => lp.decode(source),
async function(source) {
for await (const data of source) {
const resp = uint8ArrayToString(data.subarray())
console.log(`RESPONSE RECEIVED: ${resp.length}`)

const objectUrl = window.URL.createObjectURL(new Blob([resp]))
const msg: ChatMessage = {
msg: [
`File ${resp.length} bytes\n`,
<a href={objectUrl} > <b>Download</b></a >],
from: "other",
peerId: provider,
}
setMessageHistory([...messageHistory, msg])
}
}
)
// stream.close()
} else {
console.log(`Unexpected gossipsub topic: ${topic}`)
}
}

libp2p.services.pubsub.addEventListener('message', messageCB)

libp2p.handle(FILE_EXCHANGE_PROTOCOL, ({ stream }) => {
pipe(
stream.source,
(source) => lp.decode(source),
(source) => map(source, async (msg) => {
const fileId = uint8ArrayToString(msg.subarray())
console.log(`REQUEST RECEIVED: fileId:${fileId}, source:${stream.source}`)
const file = files.get(fileId)!
return file.body
}),
(source) => lp.encode(source),
stream.sink,
)
// stream.close()
})

return () => {
// Cleanup handlers 👇
// libp2p.pubsub.unsubscribe(CHAT_TOPIC)
libp2p.services.pubsub.removeEventListener('message', messageCB)
libp2p.unhandle(FILE_EXCHANGE_PROTOCOL)
}
}, [libp2p, messageHistory, setMessageHistory])

Expand All @@ -95,6 +154,35 @@ export default function ChatContainer() {
setInput('')
}, [input, messageHistory, setInput, libp2p, setMessageHistory])

const sendFile = useCallback(async (readerEvent: ProgressEvent<FileReader>) => {
const result = readerEvent.target?.result as ArrayBuffer;
console.log(`READER_RESULT: ${result.byteLength} bytes`);

const myPeerId = libp2p.peerId.toString()
const file: ChatFile = {
id: uuidv4(),
body: new Uint8Array(result),
provider: myPeerId,
}
setFiles(files.set(file.id, file))

console.log(
'peers in gossip:',
libp2p.services.pubsub.getSubscribers(CHAT_FILE_TOPIC).toString(),
)

const res = await libp2p.services.pubsub.publish(
CHAT_FILE_TOPIC,
new TextEncoder().encode(JSON.stringify({ fileId: file.id, provider: myPeerId }))
)
console.log(
'sent file to: ',
res.recipients.map((peerId) => peerId.toString()),
)

setMessageHistory([...messageHistory, { msg: input, from: 'me', peerId: myPeerId }])
}, [messageHistory, libp2p, setMessageHistory])

const handleKeyUp = useCallback(
async (e: React.KeyboardEvent<HTMLInputElement>) => {
if (e.key !== 'Enter') {
Expand Down Expand Up @@ -125,12 +213,11 @@ export default function ChatContainer() {
const reader = new FileReader();
reader.readAsArrayBuffer(e.target.files[0]);
reader.onload = (readerEvent) => {
const result = readerEvent.target?.result as ArrayBuffer;
console.log(`READER_RESULT: ${result.byteLength} bytes`);
sendFile(readerEvent)
};
}
},
[],
[sendFile],
)

const handleFileSend = useCallback(
Expand Down
31 changes: 31 additions & 0 deletions packages/frontend/src/context/file-ctx.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import React, { createContext, useContext, useState } from 'react';

export interface ChatFile {
id: string
body: Uint8Array
provider: string
}

export interface FileChatContextInterface {
files: Map<string, ChatFile>
setFiles: (files: Map<string, ChatFile>) => void;
}
export const fileContext = createContext<FileChatContextInterface>({
files: new Map<string, ChatFile>(),
setFiles: () => { }
})

export const useFileChatContext = () => {
return useContext(fileContext);
};

export const FileProvider = ({ children }: any) => {
const [files, setFiles] = useState<Map<string, ChatFile>>(new Map<string, ChatFile>());

return (
<fileContext.Provider value={{ files, setFiles }}>
{children}
</fileContext.Provider>
);
};

4 changes: 3 additions & 1 deletion packages/frontend/src/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export const CHAT_TOPIC = "universal-connectivity"
export const CHAT_FILE_TOPIC = "universal-connectivity-file"
export const FILE_EXCHANGE_PROTOCOL = "/universal-connectivity-file/1"

export const CIRCUIT_RELAY_CODE = 290

export const WEBRTC_BOOTSTRAP_NODE = "/ip4/18.195.246.16/udp/9090/webrtc-direct/certhash/uEiA8EDMfADmULSe2Bm1vVDSmN2RQPvY5MXkEZVOSyD1y2w/p2p/12D3KooWSmtsbL2ukwVwf8gDoTYZHnCd7sVNNVdMnCa4MkWjLujm"
export const WEBRTC_BOOTSTRAP_NODE = "/ip4/192.168.44.145/udp/9090/webrtc-direct/certhash/uEiAj4WmbJoD2axUNfPT17inKyoruigvq-RxnlARLjeYnUQ/p2p/12D3KooWNrDx6wbjyUUjaZVAjmAnc1sFQJjig6z3GaE9N4jrvQnS"
export const WEBTRANSPORT_BOOTSTRAP_NODE = "/ip4/3.125.128.80/udp/9095/quic-v1/webtransport/certhash/uEiAGIlVdiajNz0k1RHjrxlNXN5bb7W4dLPvMJYUrGJ9ZUQ/certhash/uEiDYZsZoO8vuTKlPhxvVR5SFwOkbXfjlsmTLUHNlnG24bg/p2p/12D3KooWEymoJRHaxizLrrKgJ9MhEYpG85fQ7HReRMJuEMLqmNMg"
10 changes: 6 additions & 4 deletions packages/frontend/src/lib/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import { bootstrap } from '@libp2p/bootstrap'
import { kadDHT } from '@libp2p/kad-dht'
import type { PeerId } from '@libp2p/interface-peer-id'
import {
multiaddr,
Multiaddr,
Expand All @@ -14,7 +15,7 @@ import { gossipsub } from '@chainsafe/libp2p-gossipsub'
import { webSockets } from '@libp2p/websockets'
import { webTransport } from '@libp2p/webtransport'
import { webRTC, webRTCDirect } from '@libp2p/webrtc'
import { CHAT_TOPIC, CIRCUIT_RELAY_CODE, WEBRTC_BOOTSTRAP_NODE, WEBTRANSPORT_BOOTSTRAP_NODE } from './constants'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, CIRCUIT_RELAY_CODE, FILE_EXCHANGE_PROTOCOL, WEBRTC_BOOTSTRAP_NODE, WEBTRANSPORT_BOOTSTRAP_NODE } from './constants'
import * as filters from "@libp2p/websockets/filters"
import { circuitRelayTransport } from 'libp2p/circuit-relay'

Expand All @@ -35,7 +36,7 @@ export async function startLibp2p() {
}),
webRTC({
rtcConfiguration: {
iceServers:[{
iceServers: [{
urls: [
'stun:stun.l.google.com:19302',
'stun:global.stun.twilio.com:3478'
Expand Down Expand Up @@ -82,8 +83,9 @@ export async function startLibp2p() {
})

libp2p.services.pubsub.subscribe(CHAT_TOPIC)
libp2p.services.pubsub.subscribe(CHAT_FILE_TOPIC)

libp2p.addEventListener('self:peer:update', ({detail: { peer }}) => {
libp2p.addEventListener('self:peer:update', ({ detail: { peer } }) => {
const multiaddrs = peer.addresses.map(({ multiaddr }) => multiaddr)

console.log(`changed multiaddrs: peer ${peer.id.toString()} multiaddrs: ${multiaddrs}`)
Expand Down Expand Up @@ -115,5 +117,5 @@ export const connectToMultiaddr =
console.error(e)
throw e
}
}
}

8 changes: 4 additions & 4 deletions rust-peer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ fn create_swarm(
)
.expect("Correct configuration");

// Create a Gossipsub topic
// Create/subscribe Gossipsub topics
let topic = gossipsub::IdentTopic::new("universal-connectivity");

// subscribes to our topic
gossipsub.subscribe(&topic)?;
let topic = gossipsub::IdentTopic::new("universal-connectivity-file");
gossipsub.subscribe(&topic)?;

let transport = {
Expand Down Expand Up @@ -421,7 +421,7 @@ pub struct FileResponse(Vec<u8>);

impl ProtocolName for FileExchangeProtocol {
fn protocol_name(&self) -> &[u8] {
"/file-exchange/1".as_bytes()
"/universal-connectivity-file/1".as_bytes()
}
}

Expand Down

0 comments on commit 6e2598f

Please sign in to comment.