Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataStream support #1301

Open
wants to merge 50 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
6008b45
wip file send
lukasIO Oct 23, 2024
ce98846
stream reader api
lukasIO Oct 24, 2024
79bd754
add text stream APIs
lukasIO Oct 24, 2024
3d3b794
add topic check
lukasIO Oct 24, 2024
18bc2fc
add streamText method
lukasIO Oct 25, 2024
7a33731
text streaming greeting
lukasIO Oct 25, 2024
3070872
refactor
lukasIO Oct 25, 2024
be1fb19
simplify
lukasIO Oct 25, 2024
3d9242a
wording
lukasIO Oct 25, 2024
2bb8207
handle engine close
lukasIO Oct 25, 2024
f31ef37
file attachements for text messages
lukasIO Nov 5, 2024
7c68d06
add complete at the end
lukasIO Nov 5, 2024
89bcfe0
wip
lukasIO Nov 8, 2024
7e2e6a0
Merge branch 'main' into lukas/file-send
lukasIO Nov 22, 2024
b954c31
wip add progress
lukasIO Nov 22, 2024
903b8fd
Add stream reader progress support
lukasIO Nov 25, 2024
9812835
update proposal naming
lukasIO Nov 25, 2024
e99aad0
bigint helpers
lukasIO Nov 25, 2024
8e5b71a
fix import
lukasIO Nov 25, 2024
bc647db
add progress for sending
lukasIO Nov 25, 2024
e8deef7
fix progress calc
lukasIO Nov 25, 2024
3b06277
use correct protocol version
lukasIO Nov 28, 2024
759f272
update mutex dep
lukasIO Nov 28, 2024
86d589a
Merge branch 'main' into lukas/file-send
lukasIO Nov 29, 2024
f247795
Merge branch 'main' into lukas/file-send
lukasIO Dec 6, 2024
e5100f4
wip
lukasIO Dec 6, 2024
c660777
new reader api
lukasIO Dec 6, 2024
cde351e
use actual protocol
lukasIO Dec 6, 2024
2134144
custom streamwriter
lukasIO Dec 6, 2024
1a7ca88
handle engine close during stream
lukasIO Dec 6, 2024
a0c6862
address comments
lukasIO Dec 8, 2024
ae700cd
address more comments
lukasIO Dec 10, 2024
9965121
naming
lukasIO Dec 18, 2024
a77aaf0
fix demo
lukasIO Dec 21, 2024
9663276
remove refs to totalChunks
lukasIO Dec 22, 2024
37b3c34
Merge branch 'main' into lukas/file-send
lukasIO Jan 10, 2025
0167f80
include info in streamwriter
lukasIO Jan 10, 2025
3648c7b
return id for file and text
lukasIO Jan 10, 2025
5e1d9d6
stream trailer support
lukasIO Jan 10, 2025
9a8cd8f
Merge branch 'main' into lukas/file-send
lukasIO Jan 13, 2025
15d266e
update protocol
lukasIO Jan 13, 2025
d5f367b
fix import
lukasIO Jan 17, 2025
c2a0c5a
Create nine-lies-invent.md
lukasIO Jan 17, 2025
230a61d
Change FileStream to ByteStream
lukasIO Jan 21, 2025
6087e8e
Merge branch 'lukas/file-send' of github.com:livekit/client-sdk-js in…
lukasIO Jan 21, 2025
2cf7fd2
Remove stream events and replace with topic callbacks (#1373)
lukasIO Jan 22, 2025
e39445a
Merge branch 'main' into lukas/file-send
lukasIO Jan 22, 2025
b9d8031
Merge branch 'lukas/file-send' of github.com:livekit/client-sdk-js in…
lukasIO Jan 22, 2025
d09d5e7
update protocol
lukasIO Jan 22, 2025
a55a232
throw error if stream handler has been registered already
lukasIO Jan 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nine-lies-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": minor
---

Add DataStream support
103 changes: 89 additions & 14 deletions examples/demo/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import {
supportsAV1,
supportsVP9,
} from '../../src/index';
import { isSVCCodec } from '../../src/room/utils';
import { isSVCCodec, sleep } from '../../src/room/utils';

setLogLevel(LogLevel.debug);

Expand All @@ -48,6 +48,7 @@ const state = {
defaultDevices: new Map<MediaDeviceKind, string>([['audioinput', 'default']]),
bitrateInterval: undefined as any,
e2eeKeyProvider: new ExternalE2EEKeyProvider(),
chatMessages: new Map<string, { text: string; participant?: Participant }>(),
};
let currentRoom: Room | undefined;

Expand All @@ -72,6 +73,15 @@ function updateSearchParams(url: string, token: string, key: string) {

// handles actions from the HTML
const appActions = {
sendFile: async () => {
console.log('start sending');
const file = ($('file') as HTMLInputElement).files?.[0]!;
currentRoom?.localParticipant.sendFile(file, {
mimeType: file.type,
topic: 'welcome',
onProgress: (progress) => console.log('sending file, progress', Math.ceil(progress * 100)),
});
},
connectWithFormInput: async () => {
const url = (<HTMLInputElement>$('url')).value;
const token = (<HTMLInputElement>$('token')).value;
Expand Down Expand Up @@ -234,6 +244,50 @@ const appActions = {
);
});

room.setTextStreamHandler(async (reader, participant) => {
const info = reader.info;
if (info.size) {
handleChatMessage(
{
id: info.id,
timestamp: info.timestamp,
message: await reader.readAll(),
},
room.getParticipantByIdentity(participant?.identity),
);
} else {
for await (const msg of reader) {
handleChatMessage(
{
id: info.id,
timestamp: info.timestamp,
message: msg.collected,
},
room.getParticipantByIdentity(participant?.identity),
);
}
appendLog('text stream finished');
}
console.log('final info including close extensions', reader.info);
}, 'chat');

room.setByteStreamHandler(async (reader, participant) => {
const info = reader.info;

appendLog(`started to receive a file called "${info.name}" from ${participant?.identity}`);
reader.onProgress = (progress) => {
console.log(`"progress ${progress ? (progress * 100).toFixed(0) : 'undefined'}%`);
};
const result = new Blob(await reader.readAll(), { type: info.mimeType });
appendLog(`completely received file called "${info.name}" from ${participant?.identity}`);
const downloadLink = URL.createObjectURL(result);
const linkEl = document.createElement('a');
linkEl.href = downloadLink;
linkEl.innerText = info.name;
linkEl.setAttribute('download', info.name);
document.body.append(linkEl);
}, 'welcome');

try {
// read and set current key from input
const cryptoKey = (<HTMLSelectElement>$('crypto-key')).value;
Expand Down Expand Up @@ -295,32 +349,32 @@ const appActions = {
const cssRules = [...styleSheet.cssRules].map((rule) => rule.cssText).join('');
const style = document.createElement('style');
style.textContent = cssRules;
pipWindow.document.head.appendChild(style);
pipWindow?.document.head.appendChild(style);
} catch (e) {
const link = document.createElement('link');
link.rel = 'stylesheet';
link.type = styleSheet.type;
link.media = styleSheet.media;
link.href = styleSheet.href;
pipWindow.document.head.appendChild(link);
link.href = styleSheet.href!;
pipWindow?.document.head.appendChild(link);
}
});
// Move participant videos to the Picture-in-Picture window
const participantsArea = $('participants-area');
const pipParticipantsArea = document.createElement('div');
pipParticipantsArea.id = 'participants-area';
pipWindow.document.body.append(pipParticipantsArea);
pipWindow?.document.body.append(pipParticipantsArea);
[...participantsArea.children].forEach((child) => pipParticipantsArea.append(child));

// Move participant videos back when the Picture-in-Picture window closes.
pipWindow.addEventListener('pagehide', (event) => {
pipWindow?.addEventListener('pagehide', () => {
setButtonState('toggle-pip-button', 'Open PiP', false);
if (currentRoom?.state === ConnectionState.Connected)
[...pipParticipantsArea.children].forEach((child) => participantsArea.append(child));
});

// Close PiP when room disconnects
currentRoom.once('disconnected', (e) => window.documentPictureInPicture?.window.close());
currentRoom!.once('disconnected', () => window.documentPictureInPicture?.window?.close());
},

ratchetE2EEKey: async () => {
Expand Down Expand Up @@ -402,7 +456,7 @@ const appActions = {
if (!currentRoom) return;
const textField = <HTMLInputElement>$('entry');
if (textField.value) {
currentRoom.localParticipant.sendChatMessage(textField.value);
currentRoom.localParticipant.sendText(textField.value, { topic: 'chat' });
textField.value = '';
}
},
Expand Down Expand Up @@ -492,13 +546,32 @@ declare global {
window.appActions = appActions;

// --------------------------- event handlers ------------------------------- //
function handleChatMessage(msg: ChatMessage, participant?: Participant) {
state.chatMessages.set(msg.id, { text: msg.message, participant });

function handleChatMessage(msg: ChatMessage, participant?: LocalParticipant | RemoteParticipant) {
(<HTMLTextAreaElement>$('chat')).value +=
`${participant?.identity}${participant instanceof LocalParticipant ? ' (me)' : ''}: ${msg.message}\n`;
const chatEl = <HTMLTextAreaElement>$('chat');
chatEl.value = '';
for (const chatMsg of state.chatMessages.values()) {
chatEl.value += `${chatMsg.participant?.identity}${chatMsg.participant instanceof LocalParticipant ? ' (me)' : ''}: ${chatMsg.text}\n`;
}
}

async function sendGreetingTo(participant: Participant) {
const greeting = `Hello new participant ${participant.identity}. This is just an progressively updating chat message from me, participant ${currentRoom?.localParticipant.identity}.`;

const streamWriter = await currentRoom!.localParticipant.streamText({
topic: 'chat',
destinationIdentities: [participant.identity],
});

for (const char of greeting) {
await streamWriter.write(char);
await sleep(20);
}
await streamWriter.close();
}

function participantConnected(participant: Participant) {
async function participantConnected(participant: Participant) {
appendLog('participant', participant.identity, 'connected', participant.metadata);
participant
.on(ParticipantEvent.TrackMuted, (pub: TrackPublication) => {
Expand All @@ -515,6 +588,8 @@ function participantConnected(participant: Participant) {
.on(ParticipantEvent.ConnectionQualityChanged, () => {
renderParticipant(participant);
});

await sendGreetingTo(participant);
}

function participantDisconnected(participant: RemoteParticipant) {
Expand Down Expand Up @@ -803,14 +878,14 @@ function renderBitrate() {
}
}

function getParticipantsAreaElement() {
function getParticipantsAreaElement(): HTMLElement {
return (
window.documentPictureInPicture?.window?.document.querySelector('#participants-area') ||
$('participants-area')
);
}

function updateVideoSize(element: HTMLVideoElement, target: HTMLElement) {
function updateVideoSize(element: HTMLVideoElement, target: Element) {
target.innerHTML = `(${element.videoWidth}x${element.videoHeight})`;
}

Expand Down
2 changes: 2 additions & 0 deletions examples/demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ <h2>Livekit Sample App</h2>
>
Ratchet Key
</button>
<input id="file" type="file" />
<button onclick="appActions.sendFile()">Send file</button>
<select
id="simulate-scenario"
class="custom-select"
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
},
"dependencies": {
"@livekit/mutex": "1.1.1",
"@livekit/protocol": "1.30.0",
"@livekit/protocol": "1.32.0",
"events": "^3.3.0",
"loglevel": "^1.8.0",
"sdp-transform": "^2.14.1",
Expand Down
18 changes: 9 additions & 9 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export type {
SimulationScenario,
TranscriptionSegment,
ChatMessage,
SendTextOptions,
} from './room/types';
export * from './version';
export {
Expand Down
16 changes: 16 additions & 0 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,22 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
};

waitForBufferStatusLow(kind: DataPacket_Kind): Promise<void> {
return new Promise(async (resolve, reject) => {
if (this.isBufferStatusLow(kind)) {
resolve();
} else {
const onClosing = () => reject('Engine closed');
this.once(EngineEvent.Closing, onClosing);
while (!this.dcBufferStatus.get(kind)) {
await sleep(10);
}
this.off(EngineEvent.Closing, onClosing);
resolve();
}
});
}

/**
* @internal
*/
Expand Down
Loading