From 99abc28e0441daa11da1b572ce310ce897917b87 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 18 Sep 2024 14:35:49 -0700 Subject: [PATCH] add queueing to audiosource --- packages/livekit-rtc/src/audio_source.ts | 56 +++++++++++++- .../livekit-rtc/src/proto/audio_frame_pb.ts | 74 ++++++++++++++++++- packages/livekit-rtc/src/proto/ffi_pb.ts | 48 +++++++----- packages/livekit-rtc/src/utils.ts | 33 +++++++++ 4 files changed, 188 insertions(+), 23 deletions(-) create mode 100644 packages/livekit-rtc/src/utils.ts diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 0af142cb..7d0024f9 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -9,31 +9,45 @@ import type { CaptureAudioFrameCallback, CaptureAudioFrameResponse, NewAudioSourceResponse, -} from './proto/audio_frame_pb.js'; + + ClearAudioBufferResponse} from './proto/audio_frame_pb.js'; import { AudioSourceType, CaptureAudioFrameRequest, + ClearAudioBufferRequest, NewAudioSourceRequest, } from './proto/audio_frame_pb.js'; +import { Queue } from './utils.js'; export class AudioSource { /** @internal */ info: AudioSourceInfo; /** @internal */ ffiHandle: FfiHandle; + /** @internal */ + lastCapture: number; + /** @internal */ + currentQueueSize: number; + /** @internal */ + releaseQueue = new Queue(); sampleRate: number; numChannels: number; + queueSize: number; - constructor(sampleRate: number, numChannels: number, enableQueue?: boolean) { + constructor(sampleRate: number, numChannels: number, queueSize = 1000) { this.sampleRate = sampleRate; this.numChannels = numChannels; + this.queueSize = queueSize; + + this.lastCapture = 0; + this.currentQueueSize = 0; const req = new NewAudioSourceRequest({ type: AudioSourceType.AUDIO_SOURCE_NATIVE, sampleRate: sampleRate, numChannels: numChannels, - enableQueue: enableQueue, + queueSizeMs: queueSize, }); const res = FfiClient.instance.request({ @@ -47,7 +61,43 @@ export class AudioSource { this.ffiHandle = new FfiHandle(res.source.handle.id); } + get queuedDuration(): number { + return Math.max(this.currentQueueSize - Date.now() + this.lastCapture, 0); + } + + clearQueue() { + const req = new ClearAudioBufferRequest({ + sourceHandle: this.ffiHandle.handle, + }); + + FfiClient.instance.request({ + message: { + case: 'clearAudioBuffer', + value: req, + }, + }); + + this.releaseQueue.put(); + } + + async waitForPlayout() { + await this.releaseQueue.get().then(() => { + this.lastCapture = 0 + this.currentQueueSize = 0; + }) + } + async captureFrame(frame: AudioFrame) { + const now = Date.now() + const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture; + this.currentQueueSize += frame.samplesPerChannel / frame.sampleRate - elapsed + + // remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks) + this.currentQueueSize -= 0.05 + this.lastCapture = now + + setTimeout(this.releaseQueue.put, this.currentQueueSize) + const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, buffer: frame.protoInfo(), diff --git a/packages/livekit-rtc/src/proto/audio_frame_pb.ts b/packages/livekit-rtc/src/proto/audio_frame_pb.ts index c680608b..597dbdd4 100644 --- a/packages/livekit-rtc/src/proto/audio_frame_pb.ts +++ b/packages/livekit-rtc/src/proto/audio_frame_pb.ts @@ -276,9 +276,9 @@ export class NewAudioSourceRequest extends Message { numChannels = 0; /** - * @generated from field: optional bool enable_queue = 5; + * @generated from field: uint32 queue_size_ms = 5; */ - enableQueue?: boolean; + queueSizeMs = 0; constructor(data?: PartialMessage) { super(); @@ -292,7 +292,7 @@ export class NewAudioSourceRequest extends Message { { no: 2, name: "options", kind: "message", T: AudioSourceOptions, opt: true }, { no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */ }, { no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */ }, - { no: 5, name: "enable_queue", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true }, + { no: 5, name: "queue_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): NewAudioSourceRequest { @@ -475,6 +475,74 @@ export class CaptureAudioFrameCallback extends Message { + /** + * @generated from field: uint64 source_handle = 1; + */ + sourceHandle = protoInt64.zero; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "livekit.proto.ClearAudioBufferRequest"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ClearAudioBufferRequest { + return new ClearAudioBufferRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ClearAudioBufferRequest { + return new ClearAudioBufferRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ClearAudioBufferRequest { + return new ClearAudioBufferRequest().fromJsonString(jsonString, options); + } + + static equals(a: ClearAudioBufferRequest | PlainMessage | undefined, b: ClearAudioBufferRequest | PlainMessage | undefined): boolean { + return proto3.util.equals(ClearAudioBufferRequest, a, b); + } +} + +/** + * @generated from message livekit.proto.ClearAudioBufferResponse + */ +export class ClearAudioBufferResponse extends Message { + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "livekit.proto.ClearAudioBufferResponse"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ClearAudioBufferResponse { + return new ClearAudioBufferResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ClearAudioBufferResponse { + return new ClearAudioBufferResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ClearAudioBufferResponse { + return new ClearAudioBufferResponse().fromJsonString(jsonString, options); + } + + static equals(a: ClearAudioBufferResponse | PlainMessage | undefined, b: ClearAudioBufferResponse | PlainMessage | undefined): boolean { + return proto3.util.equals(ClearAudioBufferResponse, a, b); + } +} + /** * Create a new AudioResampler * diff --git a/packages/livekit-rtc/src/proto/ffi_pb.ts b/packages/livekit-rtc/src/proto/ffi_pb.ts index 5646abf8..e74b4809 100644 --- a/packages/livekit-rtc/src/proto/ffi_pb.ts +++ b/packages/livekit-rtc/src/proto/ffi_pb.ts @@ -22,7 +22,7 @@ import { Message, proto3, protoInt64 } from "@bufbuild/protobuf"; import { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js"; import { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, TrackEvent } from "./track_pb.js"; import { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js"; -import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; +import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; import { E2eeRequest, E2eeResponse } from "./e2ee_pb.js"; /** @@ -239,25 +239,31 @@ export class FfiRequest extends Message { case: "captureAudioFrame"; } | { /** - * @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 28; + * @generated from field: livekit.proto.ClearAudioBufferRequest clear_audio_buffer = 28; + */ + value: ClearAudioBufferRequest; + case: "clearAudioBuffer"; + } | { + /** + * @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 29; */ value: NewAudioResamplerRequest; case: "newAudioResampler"; } | { /** - * @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 29; + * @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 30; */ value: RemixAndResampleRequest; case: "remixAndResample"; } | { /** - * @generated from field: livekit.proto.E2eeRequest e2ee = 30; + * @generated from field: livekit.proto.E2eeRequest e2ee = 31; */ value: E2eeRequest; case: "e2ee"; } | { /** - * @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 31; + * @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 32; */ value: AudioStreamFromParticipantRequest; case: "audioStreamFromParticipant"; @@ -297,10 +303,11 @@ export class FfiRequest extends Message { { no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamRequest, oneof: "message" }, { no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceRequest, oneof: "message" }, { no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameRequest, oneof: "message" }, - { no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" }, - { no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" }, - { no: 30, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" }, - { no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" }, + { no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferRequest, oneof: "message" }, + { no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" }, + { no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" }, + { no: 31, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" }, + { no: 32, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): FfiRequest { @@ -495,25 +502,31 @@ export class FfiResponse extends Message { case: "captureAudioFrame"; } | { /** - * @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 28; + * @generated from field: livekit.proto.ClearAudioBufferResponse clear_audio_buffer = 28; + */ + value: ClearAudioBufferResponse; + case: "clearAudioBuffer"; + } | { + /** + * @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 29; */ value: NewAudioResamplerResponse; case: "newAudioResampler"; } | { /** - * @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 29; + * @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 30; */ value: RemixAndResampleResponse; case: "remixAndResample"; } | { /** - * @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 30; + * @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 31; */ value: AudioStreamFromParticipantResponse; case: "audioStreamFromParticipant"; } | { /** - * @generated from field: livekit.proto.E2eeResponse e2ee = 31; + * @generated from field: livekit.proto.E2eeResponse e2ee = 32; */ value: E2eeResponse; case: "e2ee"; @@ -553,10 +566,11 @@ export class FfiResponse extends Message { { no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamResponse, oneof: "message" }, { no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceResponse, oneof: "message" }, { no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameResponse, oneof: "message" }, - { no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" }, - { no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" }, - { no: 30, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" }, - { no: 31, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" }, + { no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferResponse, oneof: "message" }, + { no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" }, + { no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" }, + { no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" }, + { no: 32, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): FfiResponse { diff --git a/packages/livekit-rtc/src/utils.ts b/packages/livekit-rtc/src/utils.ts new file mode 100644 index 00000000..c6167d66 --- /dev/null +++ b/packages/livekit-rtc/src/utils.ts @@ -0,0 +1,33 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import EventEmitter, { once } from "events"; + +/** @internal */ +export class Queue { + #items: T[] = []; + #limit?: number; + #events = new EventEmitter(); + + constructor(limit?: number) { + this.#limit = limit; + } + + async get(): Promise { + if (this.#items.length === 0) { + await once(this.#events, 'put'); + } + const item = this.#items.shift()!; + this.#events.emit('get'); + return item; + } + + async put(item: T) { + if (this.#limit && this.#items.length >= this.#limit) { + await once(this.#events, 'get'); + } + this.#items.push(item); + this.#events.emit('put'); + } +}