Skip to content

Commit

Permalink
rtc: bump ffi to v0.10.2 (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp authored Sep 20, 2024
1 parent 0e21559 commit c6766b8
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 58 deletions.
5 changes: 5 additions & 0 deletions .changeset/strong-dodos-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

bump ffi to v0.10.2
7 changes: 3 additions & 4 deletions examples/publish-wav/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ const track = LocalAudioTrack.createAudioTrack('audio', source);
const options = new TrackPublishOptions();
const buffer = new Int16Array(sample.buffer);
options.source = TrackSource.SOURCE_MICROPHONE;
await room.localParticipant.publishTrack(track, options);
await new Promise((resolve) => setTimeout(resolve, 1000)); // wait a bit so the start doesn't cut off
await room.localParticipant.publishTrack(track, options).then((pub) => pub.waitForSubscription());

let written = 44; // start of WAVE data stream
const FRAME_DURATION = 1; // write 1s of audio at a time
const numSamples = sampleRate / FRAME_DURATION;
const numSamples = sampleRate * FRAME_DURATION;
while (written < dataSize) {
const available = dataSize - written;
const frameSize = Math.min(numSamples, available);
Expand All @@ -61,9 +60,9 @@ while (written < dataSize) {
Math.trunc(frameSize / channels),
);
await source.captureFrame(frame);

written += frameSize;
}
await source.waitForPlayout();

await room.disconnect();
await dispose();
2 changes: 1 addition & 1 deletion packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"dependencies": {
"@bufbuild/protobuf": "^1.4.2",
"typed-emitter": "^2.1.0"
"@livekit/typed-emitter": "^3.0.0"
},
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
Expand Down
71 changes: 69 additions & 2 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import type {
AudioSourceInfo,
CaptureAudioFrameCallback,
CaptureAudioFrameResponse,
ClearAudioBufferResponse,
NewAudioSourceResponse,
} from './proto/audio_frame_pb.js';
import {
AudioSourceType,
CaptureAudioFrameRequest,
ClearAudioBufferRequest,
NewAudioSourceRequest,
} from './proto/audio_frame_pb.js';

Expand All @@ -21,19 +23,33 @@ export class AudioSource {
info: AudioSourceInfo;
/** @internal */
ffiHandle: FfiHandle;
/** @internal */
lastCapture: number;
/** @internal */
currentQueueSize: number;
/** @internal */
release = () => {};
promise = this.newPromise();
/** @internal */
timeout?: ReturnType<typeof setTimeout> = undefined;

sampleRate: number;
numChannels: number;
queueSize: number;

constructor(sampleRate: number, numChannels: number, enableQueue?: boolean) {
constructor(sampleRate: number, numChannels: number, queueSize = 1000) {
this.sampleRate = sampleRate;
this.numChannels = numChannels;
this.queueSize = queueSize;

this.lastCapture = 0;
this.currentQueueSize = 0;

const req = new NewAudioSourceRequest({
type: AudioSourceType.AUDIO_SOURCE_NATIVE,
sampleRate: sampleRate,
numChannels: numChannels,
enableQueue: enableQueue,
queueSizeMs: queueSize,
});

const res = FfiClient.instance.request<NewAudioSourceResponse>({
Expand All @@ -47,7 +63,58 @@ export class AudioSource {
this.ffiHandle = new FfiHandle(res.source.handle.id);
}

get queuedDuration(): number {
return Math.max(
this.currentQueueSize - Number(process.hrtime.bigint() / 1000000n) + this.lastCapture,
0,
);
}

clearQueue() {
const req = new ClearAudioBufferRequest({
sourceHandle: this.ffiHandle.handle,
});

FfiClient.instance.request<ClearAudioBufferResponse>({
message: {
case: 'clearAudioBuffer',
value: req,
},
});

this.release();
}

/** @internal */
async newPromise() {
return new Promise<void>((resolve) => {
this.release = resolve;
});
}

async waitForPlayout() {
return this.promise.then(() => {
this.lastCapture = 0;
this.currentQueueSize = 0;
this.promise = this.newPromise();
});
}

async captureFrame(frame: AudioFrame) {
const now = Number(process.hrtime.bigint() / 1000000n);
const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture;
this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000;

this.lastCapture = now;

if (this.timeout) {
clearTimeout(this.timeout);
}

// remove 50ms to account for processing time
// (e.g. using wait_for_playout for very small chunks)
this.timeout = setTimeout(this.release, this.currentQueueSize - 50);

const req = new CaptureAudioFrameRequest({
sourceHandle: this.ffiHandle.handle,
buffer: frame.protoInfo(),
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import type TypedEmitter from 'typed-emitter';
import { AudioFrame } from './audio_frame.js';
import type { FfiEvent } from './ffi_client.js';
import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/ffi_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { PartialMessage } from '@bufbuild/protobuf';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import type TypedEmitter from 'typed-emitter';
import {
FfiHandle,
livekitCopyBuffer,
Expand Down
13 changes: 5 additions & 8 deletions packages/livekit-rtc/src/napi/native.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

/* auto-generated by NAPI-RS */

export declare function livekitInitialize(
callback: (data: Uint8Array) => void,
captureLogs: boolean,
): void;
export declare function livekitFfiRequest(data: Uint8Array): Uint8Array;
export declare function livekitRetrievePtr(handle: Uint8Array): bigint;
export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export declare function livekitDispose(): Promise<void>;
export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void;
export function livekitFfiRequest(data: Uint8Array): Uint8Array;
export function livekitRetrievePtr(handle: Uint8Array): bigint;
export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export function livekitDispose(): Promise<void>;
export declare class FfiHandle {
constructor(handle: bigint);
dispose(): void;
Expand Down
74 changes: 71 additions & 3 deletions packages/livekit-rtc/src/proto/audio_frame_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
numChannels = 0;

/**
* @generated from field: optional bool enable_queue = 5;
* @generated from field: uint32 queue_size_ms = 5;
*/
enableQueue?: boolean;
queueSizeMs = 0;

constructor(data?: PartialMessage<NewAudioSourceRequest>) {
super();
Expand All @@ -292,7 +292,7 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
{ no: 2, name: "options", kind: "message", T: AudioSourceOptions, opt: true },
{ no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 5, name: "enable_queue", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true },
{ no: 5, name: "queue_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): NewAudioSourceRequest {
Expand Down Expand Up @@ -475,6 +475,74 @@ export class CaptureAudioFrameCallback extends Message<CaptureAudioFrameCallback
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferRequest
*/
export class ClearAudioBufferRequest extends Message<ClearAudioBufferRequest> {
/**
* @generated from field: uint64 source_handle = 1;
*/
sourceHandle = protoInt64.zero;

constructor(data?: PartialMessage<ClearAudioBufferRequest>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferRequest";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined, b: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferRequest, a, b);
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferResponse
*/
export class ClearAudioBufferResponse extends Message<ClearAudioBufferResponse> {
constructor(data?: PartialMessage<ClearAudioBufferResponse>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferResponse";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined, b: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferResponse, a, b);
}
}

/**
* Create a new AudioResampler
*
Expand Down
Loading

0 comments on commit c6766b8

Please sign in to comment.