Skip to content

Commit

Permalink
add queueing to audiosource
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Sep 18, 2024
1 parent 8a544e6 commit 99abc28
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 23 deletions.
56 changes: 53 additions & 3 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,45 @@ import type {
CaptureAudioFrameCallback,
CaptureAudioFrameResponse,
NewAudioSourceResponse,
} from './proto/audio_frame_pb.js';

ClearAudioBufferResponse} from './proto/audio_frame_pb.js';
import {
AudioSourceType,
CaptureAudioFrameRequest,
ClearAudioBufferRequest,
NewAudioSourceRequest,
} from './proto/audio_frame_pb.js';
import { Queue } from './utils.js';

export class AudioSource {
/** @internal */
info: AudioSourceInfo;
/** @internal */
ffiHandle: FfiHandle;
/** @internal */
lastCapture: number;
/** @internal */
currentQueueSize: number;
/** @internal */
releaseQueue = new Queue<void>();

sampleRate: number;
numChannels: number;
queueSize: number;

constructor(sampleRate: number, numChannels: number, enableQueue?: boolean) {
constructor(sampleRate: number, numChannels: number, queueSize = 1000) {
this.sampleRate = sampleRate;
this.numChannels = numChannels;
this.queueSize = queueSize;

this.lastCapture = 0;
this.currentQueueSize = 0;

const req = new NewAudioSourceRequest({
type: AudioSourceType.AUDIO_SOURCE_NATIVE,
sampleRate: sampleRate,
numChannels: numChannels,
enableQueue: enableQueue,
queueSizeMs: queueSize,
});

const res = FfiClient.instance.request<NewAudioSourceResponse>({
Expand All @@ -47,7 +61,43 @@ export class AudioSource {
this.ffiHandle = new FfiHandle(res.source.handle.id);
}

get queuedDuration(): number {
return Math.max(this.currentQueueSize - Date.now() + this.lastCapture, 0);
}

clearQueue() {
const req = new ClearAudioBufferRequest({
sourceHandle: this.ffiHandle.handle,
});

FfiClient.instance.request<ClearAudioBufferResponse>({
message: {
case: 'clearAudioBuffer',
value: req,
},
});

this.releaseQueue.put();
}

async waitForPlayout() {
await this.releaseQueue.get().then(() => {
this.lastCapture = 0
this.currentQueueSize = 0;
})
}

async captureFrame(frame: AudioFrame) {
const now = Date.now()
const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture;
this.currentQueueSize += frame.samplesPerChannel / frame.sampleRate - elapsed

// remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks)
this.currentQueueSize -= 0.05
this.lastCapture = now

setTimeout(this.releaseQueue.put, this.currentQueueSize)

const req = new CaptureAudioFrameRequest({
sourceHandle: this.ffiHandle.handle,
buffer: frame.protoInfo(),
Expand Down
74 changes: 71 additions & 3 deletions packages/livekit-rtc/src/proto/audio_frame_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
numChannels = 0;

/**
* @generated from field: optional bool enable_queue = 5;
* @generated from field: uint32 queue_size_ms = 5;
*/
enableQueue?: boolean;
queueSizeMs = 0;

constructor(data?: PartialMessage<NewAudioSourceRequest>) {
super();
Expand All @@ -292,7 +292,7 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
{ no: 2, name: "options", kind: "message", T: AudioSourceOptions, opt: true },
{ no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 5, name: "enable_queue", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true },
{ no: 5, name: "queue_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): NewAudioSourceRequest {
Expand Down Expand Up @@ -475,6 +475,74 @@ export class CaptureAudioFrameCallback extends Message<CaptureAudioFrameCallback
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferRequest
*/
export class ClearAudioBufferRequest extends Message<ClearAudioBufferRequest> {
/**
* @generated from field: uint64 source_handle = 1;
*/
sourceHandle = protoInt64.zero;

constructor(data?: PartialMessage<ClearAudioBufferRequest>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferRequest";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined, b: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferRequest, a, b);
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferResponse
*/
export class ClearAudioBufferResponse extends Message<ClearAudioBufferResponse> {
constructor(data?: PartialMessage<ClearAudioBufferResponse>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferResponse";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined, b: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferResponse, a, b);
}
}

/**
* Create a new AudioResampler
*
Expand Down
48 changes: 31 additions & 17 deletions packages/livekit-rtc/src/proto/ffi_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { Message, proto3, protoInt64 } from "@bufbuild/protobuf";
import { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js";
import { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, TrackEvent } from "./track_pb.js";
import { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js";
import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js";
import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js";
import { E2eeRequest, E2eeResponse } from "./e2ee_pb.js";

/**
Expand Down Expand Up @@ -239,25 +239,31 @@ export class FfiRequest extends Message<FfiRequest> {
case: "captureAudioFrame";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 28;
* @generated from field: livekit.proto.ClearAudioBufferRequest clear_audio_buffer = 28;
*/
value: ClearAudioBufferRequest;
case: "clearAudioBuffer";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 29;
*/
value: NewAudioResamplerRequest;
case: "newAudioResampler";
} | {
/**
* @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 29;
* @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 30;
*/
value: RemixAndResampleRequest;
case: "remixAndResample";
} | {
/**
* @generated from field: livekit.proto.E2eeRequest e2ee = 30;
* @generated from field: livekit.proto.E2eeRequest e2ee = 31;
*/
value: E2eeRequest;
case: "e2ee";
} | {
/**
* @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 31;
* @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 32;
*/
value: AudioStreamFromParticipantRequest;
case: "audioStreamFromParticipant";
Expand Down Expand Up @@ -297,10 +303,11 @@ export class FfiRequest extends Message<FfiRequest> {
{ no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamRequest, oneof: "message" },
{ no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceRequest, oneof: "message" },
{ no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameRequest, oneof: "message" },
{ no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" },
{ no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" },
{ no: 30, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" },
{ no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" },
{ no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferRequest, oneof: "message" },
{ no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" },
{ no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" },
{ no: 31, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" },
{ no: 32, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): FfiRequest {
Expand Down Expand Up @@ -495,25 +502,31 @@ export class FfiResponse extends Message<FfiResponse> {
case: "captureAudioFrame";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 28;
* @generated from field: livekit.proto.ClearAudioBufferResponse clear_audio_buffer = 28;
*/
value: ClearAudioBufferResponse;
case: "clearAudioBuffer";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 29;
*/
value: NewAudioResamplerResponse;
case: "newAudioResampler";
} | {
/**
* @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 29;
* @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 30;
*/
value: RemixAndResampleResponse;
case: "remixAndResample";
} | {
/**
* @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 30;
* @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 31;
*/
value: AudioStreamFromParticipantResponse;
case: "audioStreamFromParticipant";
} | {
/**
* @generated from field: livekit.proto.E2eeResponse e2ee = 31;
* @generated from field: livekit.proto.E2eeResponse e2ee = 32;
*/
value: E2eeResponse;
case: "e2ee";
Expand Down Expand Up @@ -553,10 +566,11 @@ export class FfiResponse extends Message<FfiResponse> {
{ no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamResponse, oneof: "message" },
{ no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceResponse, oneof: "message" },
{ no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameResponse, oneof: "message" },
{ no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" },
{ no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" },
{ no: 30, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" },
{ no: 31, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" },
{ no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferResponse, oneof: "message" },
{ no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" },
{ no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" },
{ no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" },
{ no: 32, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): FfiResponse {
Expand Down
33 changes: 33 additions & 0 deletions packages/livekit-rtc/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

import EventEmitter, { once } from "events";

/** @internal */
export class Queue<T> {
#items: T[] = [];
#limit?: number;
#events = new EventEmitter();

constructor(limit?: number) {
this.#limit = limit;
}

async get(): Promise<T> {
if (this.#items.length === 0) {
await once(this.#events, 'put');
}
const item = this.#items.shift()!;
this.#events.emit('get');
return item;
}

async put(item: T) {
if (this.#limit && this.#items.length >= this.#limit) {
await once(this.#events, 'get');
}
this.#items.push(item);
this.#events.emit('put');
}
}

0 comments on commit 99abc28

Please sign in to comment.