From e0722c1020ac308a8d7c5d55d495931319b0aeae Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 23 Sep 2024 18:05:09 +0200 Subject: [PATCH] Ensure republishing is finished when calling setTrackEnabled methods (#1250) * Ensure republishing is finished when calling setTrackEnabled methods * Create loud-carrots-agree.md --- .changeset/loud-carrots-agree.md | 5 + src/room/participant/LocalParticipant.ts | 131 +++++++++++++++-------- 2 files changed, 93 insertions(+), 43 deletions(-) create mode 100644 .changeset/loud-carrots-agree.md diff --git a/.changeset/loud-carrots-agree.md b/.changeset/loud-carrots-agree.md new file mode 100644 index 0000000000..5d55659038 --- /dev/null +++ b/.changeset/loud-carrots-agree.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Ensure republishing is finished when calling setTrackEnabled methods diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 729707c2ff..2974910c7d 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -88,6 +88,8 @@ export default class LocalParticipant extends Participant { private pendingPublishPromises = new Map>(); + private republishPromise: Promise | undefined; + private cameraError: Error | undefined; private microphoneError: Error | undefined; @@ -380,6 +382,9 @@ export default class LocalParticipant extends Participant { publishOptions?: TrackPublishOptions, ) { this.log.debug('setTrackEnabled', { ...this.logContext, source, enabled }); + if (this.republishPromise) { + await this.republishPromise; + } let track = this.getTrackPublication(source); if (enabled) { if (track) { @@ -387,9 +392,12 @@ export default class LocalParticipant extends Participant { } else { let localTracks: Array | undefined; if (this.pendingPublishing.has(source)) { - this.log.info('skipping duplicate published source', { ...this.logContext, source }); - // no-op it's already been requested - return; + const pendingTrack = await this.waitForPendingPublicationOfSource(source); + if (!pendingTrack) { + this.log.info('skipping duplicate published source', { ...this.logContext, source }); + } + await pendingTrack?.unmute(); + return pendingTrack; } this.pendingPublishing.add(source); try { @@ -437,16 +445,22 @@ export default class LocalParticipant extends Participant { this.pendingPublishing.delete(source); } } - } else if (track && track.track) { - // screenshare cannot be muted, unpublish instead - if (source === Track.Source.ScreenShare) { - track = await this.unpublishTrack(track.track); - const screenAudioTrack = this.getTrackPublication(Track.Source.ScreenShareAudio); - if (screenAudioTrack && screenAudioTrack.track) { - this.unpublishTrack(screenAudioTrack.track); + } else { + if (!track?.track) { + // if there's no track available yet first wait for pending publishing promises of that source to see if it becomes available + track = await this.waitForPendingPublicationOfSource(source); + } + if (track && track.track) { + // screenshare cannot be muted, unpublish instead + if (source === Track.Source.ScreenShare) { + track = await this.unpublishTrack(track.track); + const screenAudioTrack = this.getTrackPublication(Track.Source.ScreenShareAudio); + if (screenAudioTrack && screenAudioTrack.track) { + this.unpublishTrack(screenAudioTrack.track); + } + } else { + await track.mute(); } - } else { - await track.mute(); } } return track; @@ -611,15 +625,23 @@ export default class LocalParticipant extends Participant { * @param track * @param options */ - async publishTrack( + async publishTrack(track: LocalTrack | MediaStreamTrack, options?: TrackPublishOptions) { + return this.publishOrRepublishTrack(track, options); + } + + private async publishOrRepublishTrack( track: LocalTrack | MediaStreamTrack, options?: TrackPublishOptions, + isRepublish = false, ): Promise { if (track instanceof LocalAudioTrack) { track.setAudioContext(this.audioContext); } await this.reconnectFuture?.promise; + if (this.republishPromise && !isRepublish) { + await this.republishPromise; + } if (track instanceof LocalTrack && this.pendingPublishPromises.has(track)) { await this.pendingPublishPromises.get(track); } @@ -1248,39 +1270,53 @@ export default class LocalParticipant extends Participant { } async republishAllTracks(options?: TrackPublishOptions, restartTracks: boolean = true) { - const localPubs: LocalTrackPublication[] = []; - this.trackPublications.forEach((pub) => { - if (pub.track) { - if (options) { - pub.options = { ...pub.options, ...options }; - } - localPubs.push(pub); + if (this.republishPromise) { + await this.republishPromise; + } + this.republishPromise = new Promise(async (resolve, reject) => { + try { + const localPubs: LocalTrackPublication[] = []; + this.trackPublications.forEach((pub) => { + if (pub.track) { + if (options) { + pub.options = { ...pub.options, ...options }; + } + localPubs.push(pub); + } + }); + + await Promise.all( + localPubs.map(async (pub) => { + const track = pub.track!; + await this.unpublishTrack(track, false); + if ( + restartTracks && + !track.isMuted && + track.source !== Track.Source.ScreenShare && + track.source !== Track.Source.ScreenShareAudio && + (track instanceof LocalAudioTrack || track instanceof LocalVideoTrack) && + !track.isUserProvided + ) { + // generally we need to restart the track before publishing, often a full reconnect + // is necessary because computer had gone to sleep. + this.log.debug('restarting existing track', { + ...this.logContext, + track: pub.trackSid, + }); + await track.restartTrack(); + } + await this.publishOrRepublishTrack(track, pub.options, true); + }), + ); + resolve(); + } catch (error: any) { + reject(error); + } finally { + this.republishPromise = undefined; } }); - await Promise.all( - localPubs.map(async (pub) => { - const track = pub.track!; - await this.unpublishTrack(track, false); - if ( - restartTracks && - !track.isMuted && - track.source !== Track.Source.ScreenShare && - track.source !== Track.Source.ScreenShareAudio && - (track instanceof LocalAudioTrack || track instanceof LocalVideoTrack) && - !track.isUserProvided - ) { - // generally we need to restart the track before publishing, often a full reconnect - // is necessary because computer had gone to sleep. - this.log.debug('restarting existing track', { - ...this.logContext, - track: pub.trackSid, - }); - await track.restartTrack(); - } - await this.publishTrack(track, pub.options); - }), - ); + await this.republishPromise; } /** @@ -1571,4 +1607,13 @@ export default class LocalParticipant extends Participant { }); return publication; } + + private async waitForPendingPublicationOfSource(source: Track.Source) { + const publishPromiseEntry = Array.from(this.pendingPublishPromises.entries()).find( + ([pendingTrack]) => pendingTrack.source === source, + ); + if (publishPromiseEntry) { + return publishPromiseEntry[1]; + } + } }