Skip to content

Commit

Permalink
feat: Распознавание в web-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sasha-tlt committed Sep 3, 2024
1 parent e4994e1 commit b7e5a11
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 67 deletions.
6 changes: 4 additions & 2 deletions rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import copy from 'rollup-plugin-copy';
import pkg from './package.json';

const common = {
input: 'src/index.ts',
plugins: [
commonjs({
extensions: ['.js', '.jsx', '.ts', '.tsx'],
Expand Down Expand Up @@ -44,7 +43,7 @@ const getUmdConfig = (fileName, input) => ({
export default [
{
...common,
input: ['src/index.ts', 'src/createAssistantDevOrigin.ts'],
input: ['src/index.ts', 'src/createAssistantDevOrigin.ts','src/assistantSdk/voice/listener/worklet.js','src/assistantSdk/listenSdk/vps.worker.ts',],
output: {
...common.output,
dir: 'dist',
Expand Down Expand Up @@ -76,6 +75,9 @@ export default [
'src/assistantSdk/assistant.ts',
'src/mock.ts',
'src/index.ts',
'src/assistantSdk/voice/listener/worklet.js',
'src/assistantSdk/listenSdk/vps.worker.ts',
'src/assistantSdk/listenSdk/listenSdk.ts',
],
output: {
...common.output,
Expand Down
6 changes: 3 additions & 3 deletions src/assistantSdk/client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ export const createProtocol = (

status = 'connected';

window.clearTimeout(clearReadyTimer);
clearTimeout(clearReadyTimer);

/// считаем коннект = ready, если по истечении таймаута сокет не был разорван
/// т.к бек может разрывать сокет, если с settings что-то не так
clearReadyTimer = window.setTimeout(() => {
clearReadyTimer = setTimeout(() => {
if (status !== 'connected') {
return;
}
Expand Down Expand Up @@ -349,7 +349,7 @@ export const createProtocol = (
},
init: () => {
// в отличии от reconnect не обрывает коннект если он в порядке
if (status === 'ready' && window.navigator.onLine) {
if (status === 'ready' && (!window || window.navigator.onLine)) {
return Promise.resolve();
}

Expand Down
40 changes: 20 additions & 20 deletions src/assistantSdk/client/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ export const createTransport = ({ createWS = defaultWSCreator, checkCertUrl }: C
status = 'connecting';
emit('connecting');

if (!hasCert && window.navigator.onLine) {
const okay = await checkCert(checkCertUrl!);
// if (!hasCert && window?.navigator.onLine) {
// const okay = await checkCert(checkCertUrl!);

if (!okay) {
status = 'closed';
emit('close');
// if (!okay) {
// status = 'closed';
// emit('close');

emit('error', new Error('Cert authority invalid'));
// emit('error', new Error('Cert authority invalid'));

return;
}
// return;
// }

hasCert = true;
}
// hasCert = true;
// }

webSocket = createWS(url);

Expand All @@ -70,7 +70,7 @@ export const createTransport = ({ createWS = defaultWSCreator, checkCertUrl }: C
return;
}

window.clearTimeout(retryTimeoutId);
clearTimeout(retryTimeoutId);

retries = 0;

Expand All @@ -90,10 +90,10 @@ export const createTransport = ({ createWS = defaultWSCreator, checkCertUrl }: C

// пробуем переподключаться, если возникла ошибка при коннекте
if (!webSocket || (webSocket.readyState === 3 && !stopped)) {
window.clearTimeout(retryTimeoutId);
clearTimeout(retryTimeoutId);

if (retries < 2) {
retryTimeoutId = window.setTimeout(() => {
retryTimeoutId = setTimeout(() => {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
open(url);

Expand Down Expand Up @@ -129,25 +129,25 @@ export const createTransport = ({ createWS = defaultWSCreator, checkCertUrl }: C
return;
}

window.setTimeout(() => reconnect(url));
setTimeout(() => reconnect(url));

close();
};

const send = (data: Uint8Array) => {
if (!window.navigator.onLine) {
close();
emit('error');
throw new Error('The client seems to be offline');
}
// if (!window?.navigator.onLine) {
// close();
// emit('error');
// throw new Error('The client seems to be offline');
// }

webSocket.send(data);
};

return {
close,
get isOnline() {
return window.navigator.onLine;
return window?.navigator.onLine === true;
},
on,
open,
Expand Down
29 changes: 29 additions & 0 deletions src/assistantSdk/listenSdk/listenSdk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type { VpsConfiguration } from '../../typings';
import { createNavigatorAudioProvider } from '../voice/listener/navigatorAudioProvider';
import { createVoiceListener } from '../voice/listener/voiceListener';

export type InifinteListenParams = Omit<VpsConfiguration, 'fakeVps' | 'logger' | 'getToken' | 'vpsToken'> & {
token: string;
voiceMeta: Record<string, unknown>;
};

const worker = new Worker(new URL('./vps.worker.js', import.meta.url), { type: 'module' });

export function createInifiniteListen(config: InifinteListenParams, restApiUrl: string) {
const listener = createVoiceListener(
(cb, ...args) => createNavigatorAudioProvider(cb, ...args),
(port: MessagePort) => {
worker.postMessage({ type: 'start' }, [port]);
},
);
worker.postMessage({ type: 'init', params: [config, restApiUrl] });

return {
start: () => {
listener.listen();
},
stop: () => {
listener.stop();
},
};
}
100 changes: 100 additions & 0 deletions src/assistantSdk/listenSdk/vps.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { v4 } from 'uuid';

import { createClient } from '../client/client';
import { createProtocol } from '../client/protocol';
import { createTransport } from '../client/transport';

function convertFieldValuesToString<
Obj extends Record<string, unknown>,
ObjStringified = { [key in keyof Obj]: string },
>(object: Obj): ObjStringified {
return Object.keys(object).reduce((acc: Record<string, string>, key: string) => {
if (object[key]) {
acc[key] =
typeof object[key] === 'string' && (object[key] as string).startsWith('{')
? (object[key] as string)
: JSON.stringify(object[key]);
}
return acc;
}, {}) as ObjStringified;
}

let _client: ReturnType<typeof createClient>;
let _stream: (chunk: Uint8Array, last: boolean) => void;
let _push: (text: string) => void;

function init({ token, voiceMeta, ...config }, restApiUrl: string) {
const sessionId = v4();
const convertedVoiceMeta = convertFieldValuesToString(voiceMeta);
const transport = createTransport({});
const protocol = createProtocol(transport, { ...config, getToken: () => token });
_client = createClient(protocol, undefined, {
getVoiceMeta: () => convertedVoiceMeta,
});
_push = (message: string) =>
fetch(restApiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json;charset=utf-8',
},
body: JSON.stringify({
sessionId,
message,
timestamp: new Date().toISOString(),
}),
});

_client.on('stt', ({ text, response }) => {
if (text?.data) {
_push(text.data);
return;
}

if (response) {
const { decoderResultField } = response;

if (decoderResultField?.hypothesis?.length) {
_push(decoderResultField.hypothesis[0].normalizedText || '');
}
}
});
}

function start(port: MessagePort) {
_client
.init()
.then(() => {
_client.createVoiceStream(({ sendVoice }) => {
_stream = sendVoice;

return Promise.resolve();
}, {});
})
.catch((err) => {
console.error(err);
});

port.onmessage = (event) => {
_stream?.(event.data, false);
};
}

function handleChunk(chunk: Uint8Array, last: boolean) {
_stream?.(chunk, last);
}

self.addEventListener('message', function (e) {
switch (e.data.type) {
case 'init':
init(e.data.params[0], e.data.params[1]);
break;
case 'start':
start(e.ports[0]);
break;
case 'chunk':
handleChunk(e.data.params[0], e.data.params[1]);
break;
case 'stop':
break;
}
});
53 changes: 28 additions & 25 deletions src/assistantSdk/voice/listener/navigatorAudioProvider.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import { createAudioContext } from '../audioContext';

import { worker } from './worker';

async function initWorklet(context: AudioContext) {
const blob: Blob = new Blob([worker], { type: 'application/javascript; charset=utf-8' });
const url: string = URL.createObjectURL(blob);

await context.audioWorklet.addModule(url);
URL.revokeObjectURL(url);
await context.audioWorklet.addModule(new URL('./worklet.js', import.meta.url));
}

const TARGET_SAMPLE_RATE = 16000;
Expand All @@ -27,8 +21,9 @@ let analyser: AnalyserNode | null = null;
*/
const createAudioRecorder = (
stream: MediaStream,
cb: (buffer: ArrayBuffer, analyserArray: Uint8Array | null, last: boolean) => void,
cb?: (buffer: ArrayBuffer, analyserArray: Uint8Array | null, last: boolean) => void,
useAnalyser?: boolean,
onGetPort?: (port: MessagePort) => void,
): Promise<() => void> =>
new Promise((resolve) => {
let state: 'inactive' | 'recording' = 'inactive';
Expand Down Expand Up @@ -65,21 +60,28 @@ const createAudioRecorder = (
analyser = context.createAnalyser();
}

pcmProcessingNode = new AudioWorkletNode(context, 'pcm-processor', { sampleRate: context.sampleRate });
pcmProcessingNode.port.onmessage = (e) => {
const { data } = e;
const last = state === 'inactive';

let analyserArray: Uint8Array | null = null;
if (analyser) {
analyserArray = new Uint8Array(analyser.frequencyBinCount);

analyser?.getByteTimeDomainData(analyserArray);
}

resolve(stop);
cb(data, analyserArray, last);
};
pcmProcessingNode = new AudioWorkletNode(context, 'pcm-processor', {
processorOptions: { sampleRate: context.sampleRate },
});
if (onGetPort) {
setTimeout(() => resolve(stop));
onGetPort(pcmProcessingNode.port);
} else {
pcmProcessingNode.port.onmessage = (e) => {
const { data } = e;
const last = state === 'inactive';

let analyserArray: Uint8Array | null = null;
if (analyser) {
analyserArray = new Uint8Array(analyser.frequencyBinCount);

analyser?.getByteTimeDomainData(analyserArray);
}

resolve(stop);
cb && cb(data, analyserArray, last);
};
}

source.connect(pcmProcessingNode);
pcmProcessingNode.connect(context.destination);
Expand All @@ -95,15 +97,16 @@ const createAudioRecorder = (
* @returns Promise, который содержит функцию прерывающую слушание
*/
export const createNavigatorAudioProvider = (
cb: (buffer: ArrayBuffer, analyserArray: Uint8Array | null, last: boolean) => void,
cb?: (buffer: ArrayBuffer, analyserArray: Uint8Array | null, last: boolean) => void,
useAnalyser?: boolean,
onGetPort?: (port: MessagePort) => void,
): Promise<() => void> =>
navigator.mediaDevices
.getUserMedia({
audio: true,
})
.then((stream) => {
return createAudioRecorder(stream, cb, useAnalyser);
return createAudioRecorder(stream, cb, useAnalyser, onGetPort);
})
.catch((err) => {
if (window.location.protocol === 'http:') {
Expand Down
13 changes: 10 additions & 3 deletions src/assistantSdk/voice/listener/voiceListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ type VoiceStreamEvents = {
* @param createAudioProvider Источник голоса
* @returns Api для запуска и остановки слушания
*/
export const createVoiceListener = (createAudioProvider = createNavigatorAudioProvider) => {
export const createVoiceListener = (
createAudioProvider = createNavigatorAudioProvider,
onGetPort?: (port: MessagePort) => void,
) => {
const { emit, on } = createNanoEvents<VoiceStreamEvents>();
let stopRecord: () => void;
let status: VoiceListenerStatus = 'stopped';
Expand All @@ -30,13 +33,17 @@ export const createVoiceListener = (createAudioProvider = createNavigatorAudioPr
emit('status', 'stopped');
};

const listen = (handleVoice: VoiceHandler): Promise<void> => {
const listen = (handleVoice?: VoiceHandler): Promise<void> => {
cancelableToken = { current: false };
let capturedToken = cancelableToken;
status = 'started';
emit('status', 'started');

return createAudioProvider((data, analyser, last) => handleVoice(new Uint8Array(data), analyser, last))
return createAudioProvider(
handleVoice ? (data, analyser, last) => handleVoice(new Uint8Array(data), analyser, last) : undefined,
false,
onGetPort,
)
.then((recStop) => {
stopRecord = recStop;
})
Expand Down
Loading

0 comments on commit b7e5a11

Please sign in to comment.