diff --git a/binding.gyp b/binding.gyp index 5c51682..b22b1d3 100644 --- a/binding.gyp +++ b/binding.gyp @@ -19,6 +19,7 @@ "3rdParty/spandsp/src", "3rdParty/pocketsphinx/include", "3rdParty/pocketsphinx/build/include", + "3rdParty/pjwebsock/websock", " +#include "websock.h" + +PJ_BEGIN_DECL + +enum ws_speech_event +{ + WS_SPEECH_EVENT_EOF, + WS_SPEECH_EVENT_TRANSCRIPT, + WS_SPEECH_EVENT_CONNECTED, + WS_SPEECH_EVENT_CONNECTION_ERROR, + WS_SPEECH_EVENT_DISCONNECTED +}; + +PJ_DEF(pj_status_t) pjmedia_ws_speech_port_create( pj_pool_t *pool, + unsigned clock_rate, + unsigned channel_count, + unsigned samples_per_frame, + unsigned bits_per_sample, + pj_websock_endpoint *ws_endpt, + char *server_url, + void (*cb)(pjmedia_port*, void *user_data, enum ws_speech_event, char *data), + void *cb_user_data, + pjmedia_port **p_port); + +PJ_END_DECL + +#endif /* __WS_SPEECH_PORT_H__ */ diff --git a/src/pjmedia/src/pjmedia/ws_speech_port.c b/src/pjmedia/src/pjmedia/ws_speech_port.c new file mode 100644 index 0000000..dbc2e94 --- /dev/null +++ b/src/pjmedia/src/pjmedia/ws_speech_port.c @@ -0,0 +1,250 @@ +/* $Id: ws_speech_port.c 0000 2024-03-17 mayamatakeshi $ */ +/* + * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com) + * Copyright (C) 2003-2008 Benny Prijono + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include +#include + +#define SIGNATURE PJMEDIA_SIGNATURE('w', 's', 's', 'p') +#define THIS_FILE "ws_speech_port.c" + +#if 0 +# define TRACE_(expr) PJ_LOG(4,expr) +#else +# define TRACE_(expr) +#endif + +static pj_status_t put_frame(pjmedia_port *this_port, + pjmedia_frame *frame); + +static pj_status_t get_frame(pjmedia_port *this_port, + pjmedia_frame *frame); + +static pj_status_t on_destroy(pjmedia_port *this_port); + +#define SPEECH_BUFFER_SIZE 25600 + +struct ws_speech_t +{ + struct pjmedia_port base; + + struct pj_websock_endpoint *ws_endpt; + + void (*cb)(pjmedia_port*, void*, enum ws_speech_event, char*); + void *cb_user_data; + + char speech_buffer[SPEECH_BUFFER_SIZE]; + short size; + + char transcript[4096]; + + pj_websock_t *wc; +}; + + +static pj_bool_t on_connect_complete(pj_websock_t *c, pj_status_t status) +{ + char buf[1000]; + PJ_PERROR(4, (THIS_FILE, status, "%s() %s", __FUNCTION__, + pj_websock_print(c, buf, sizeof(buf)))); + + struct ws_speech_t *port = (struct ws_speech_t*)pj_websock_get_userdata(c); + if (status == PJ_SUCCESS) { + port->cb((pjmedia_port*)port, port->cb_user_data, WS_SPEECH_EVENT_CONNECTED, ""); + } else { + port->cb((pjmedia_port*)port, port->cb_user_data, WS_SPEECH_EVENT_CONNECTION_ERROR, ""); + } + + return PJ_TRUE; +} + +static pj_bool_t on_rx_msg(pj_websock_t *c, + pj_websock_rx_data *msg, + pj_status_t status) +{ + pj_websock_frame_hdr *hdr; + char *data; + char buf[1000]; + + struct ws_speech_t *port = (struct ws_speech_t*)pj_websock_get_userdata(c); + + if (status != PJ_SUCCESS) { + PJ_PERROR(2, (THIS_FILE, status, "#Disconnect with %s", + pj_websock_print(c, buf, sizeof(buf)))); + + port->cb((pjmedia_port*)port, port->cb_user_data, WS_SPEECH_EVENT_DISCONNECTED, ""); + return PJ_FALSE; + } + + hdr = &msg->hdr; + data = (char *)msg->data; + + if (hdr->opcode == PJ_WEBSOCK_OP_TEXT) { + PJ_LOG(4, (THIS_FILE, + "RX from %s:\n" + "TEXT %s %llu/%llu/%llu [%.*s]", + pj_websock_print(c, buf, sizeof(buf)), + hdr->mask ? "(masked)" : "", hdr->len, msg->has_read, + msg->data_len, (int)msg->data_len, data)); + + /* echo response */ + // pj_websock_send(c, hdr->opcode, PJ_TRUE, PJ_FALSE, data, hdr->len); + } else if (hdr->opcode == PJ_WEBSOCK_OP_PING) { + PJ_LOG(4, (THIS_FILE, "RX from %s PING", + pj_websock_print(c, buf, sizeof(buf)))); + /* response pong */ + pj_websock_send(c, PJ_WEBSOCK_OP_PONG, PJ_TRUE, PJ_TRUE, NULL, 0); + } else if (hdr->opcode == PJ_WEBSOCK_OP_PONG) { + PJ_LOG(4, (THIS_FILE, "RX from %s PONG", + pj_websock_print(c, buf, sizeof(buf)))); + } else if (hdr->opcode == PJ_WEBSOCK_OP_CLOSE) { + PJ_LOG(4, (THIS_FILE, "RX from %s CLOSE", + pj_websock_print(c, buf, sizeof(buf)))); + pj_websock_close(c, PJ_WEBSOCK_SC_GOING_AWAY, NULL); + port->cb((pjmedia_port*)port, port->cb_user_data, WS_SPEECH_EVENT_DISCONNECTED, ""); + return PJ_FALSE; /* Must return false to stop read any more */ + } + + return PJ_TRUE; +} + +static void on_state_change(pj_websock_t *c, int state) +{ + char buf[1000]; + PJ_LOG(4, (THIS_FILE, "%s() %s %s", __FUNCTION__, + pj_websock_print(c, buf, sizeof(buf)), + pj_websock_state_str(state))); +} + + +static pj_status_t speech_on_event(pjmedia_event *event, + void *user_data) +{ + struct ws_speech_t *port = (struct ws_speech_t*)user_data; + + if (event->type == PJMEDIA_EVENT_CALLBACK) { + if (port->cb) + (*port->cb)(&port->base, port->cb_user_data, WS_SPEECH_EVENT_TRANSCRIPT, port->transcript); + } + + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pjmedia_ws_speech_port_create(pj_pool_t *pool, + unsigned clock_rate, + unsigned channel_count, + unsigned samples_per_frame, + unsigned bits_per_sample, + struct pj_websock_endpoint *ws_endpt, + char *server_url, + void (*cb)(pjmedia_port*, void *user_data, enum ws_speech_event, char *transcript), + void *cb_user_data, + pjmedia_port **p_port) +{ + struct ws_speech_t *port; + const pj_str_t name = pj_str("ws_speech"); + + PJ_ASSERT_RETURN(pool && clock_rate && channel_count == 1 && + samples_per_frame && bits_per_sample == 16 && + p_port != NULL, PJ_EINVAL); + + PJ_ASSERT_RETURN(pool && p_port, PJ_EINVAL); + + PJ_ASSERT_RETURN(cb, PJ_EINVAL); + + port = PJ_POOL_ZALLOC_T(pool, struct ws_speech_t); + PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM); + + pjmedia_port_info_init(&port->base.info, &name, SIGNATURE, clock_rate, + channel_count, bits_per_sample, samples_per_frame); + + port->base.put_frame = &put_frame; + port->base.get_frame = &get_frame; + port->base.on_destroy = &on_destroy; + + port->ws_endpt = ws_endpt, + + port->cb = cb; + port->cb_user_data = cb_user_data; + + pj_websock_http_hdr hdr; + pj_websock_cb ws_cb; + pj_bzero(&cb, sizeof(ws_cb)); + ws_cb.on_connect_complete = on_connect_complete; + ws_cb.on_rx_msg = on_rx_msg; + ws_cb.on_state_change = on_state_change; + + { + hdr.key = pj_str("Sec-WebSocket-Protocol"); + hdr.val = pj_str("pjsip"); + pj_websock_connect(port->ws_endpt, server_url, &ws_cb, port, &hdr, 1, &port->wc); + } + + TRACE_((THIS_FILE, "ws_speech port created: %u/%u/%u/%u", clock_rate, + channel_count, samples_per_frame, bits_per_sample)); + + *p_port = &port->base; + return PJ_SUCCESS; +} + +static pj_status_t put_frame(pjmedia_port *this_port, pjmedia_frame *frame) { + if(frame->type != PJMEDIA_FRAME_TYPE_AUDIO) return PJ_SUCCESS; + + struct ws_speech_t *port = (struct ws_speech_t*) this_port; + + if(port->wc) { + //TODO: write binary data to socket + } + + return PJ_SUCCESS; +} + +static pj_status_t get_frame(pjmedia_port *this_port, pjmedia_frame *frame) { + PJ_ASSERT_RETURN(this_port && frame, PJ_EINVAL); + + struct ws_speech_t *port = (struct ws_speech_t*)this_port; + + if(!port->wc) { + //printf("no data\n"); + frame->type = PJMEDIA_FRAME_TYPE_NONE; + return PJ_SUCCESS; + } + + /* + memcpy(frame->buf, flite->w->samples + flite->written_samples, PJMEDIA_PIA_SPF(&port->info)*2); + flite->written_samples += PJMEDIA_PIA_SPF(&port->info); + frame->type = PJMEDIA_FRAME_TYPE_AUDIO; + */ + //printf("flite data written samples=%i\n", PJMEDIA_PIA_SPF(&port->info)); + + return PJ_SUCCESS; +} + +static pj_status_t on_destroy(pjmedia_port *this_port) +{ + struct ws_speech_t *port = (struct ws_speech_t*) this_port; + + return PJ_SUCCESS; +} + + diff --git a/src/sip.cpp b/src/sip.cpp index 51fe218..4326eea 100644 --- a/src/sip.cpp +++ b/src/sip.cpp @@ -20,10 +20,13 @@ #include "idmanager.hpp" #include "event_templates.hpp" +#include "websock.h" + #include "dtmfdet.h" #include "fax_port.h" #include "flite_port.h" #include "pocketsphinx_port.h" +#include "ws_speech_port.h" #include @@ -60,9 +63,14 @@ IdManager g_dialog_ids(IDS_MAX); #define DEFAULT_CODEC_QUALITY (5) static pjsip_endpoint *g_sip_endpt; -static pj_caching_pool cp; +static pj_caching_pool g_cp; static pj_pool_t *g_pool; static pjmedia_endpt *g_med_endpt; +static pj_timer_heap_t *g_timer_heap = NULL; +static pj_websock_endpoint *g_ws_endpt = NULL; + +#define CERT_FILE "./cert/test.pem" +#define CERT_KEY "./cert/test.key" // static pj_thread_t *g_thread = NULL; // static pj_bool_t g_thread_quit_flag; @@ -623,8 +631,8 @@ bool prepare_dtmfdet(Call *call, AudioEndpoint *ae); bool prepare_wav_player(Call *call, AudioEndpoint *ae, const char *file, unsigned flags, bool end_of_file_event); bool prepare_wav_writer(Call *call, AudioEndpoint *ae, const char *file); bool prepare_fax(Call *call, AudioEndpoint *ae, bool is_sender, const char *file, unsigned flags); -bool prepare_flite(Call *call, AudioEndpoint *ae, const char *voice, bool end_of_speech_event); -bool prepare_pocketsphinx(Call *call, AudioEndpoint *ae); +bool prepare_speech_synth(Call *call, AudioEndpoint *ae, const char *voice, bool end_of_speech_event); +bool prepare_speech_recog(Call *call, AudioEndpoint *ae); void prepare_error_event(ostringstream *oss, char *scope, char *details); // void prepare_pjsipcall_error_event(ostringstream *oss, char *scope, char @@ -1232,27 +1240,37 @@ int __pjw_init() { return 1; } + unsigned log_decor = pj_log_get_decor(); + log_decor |= PJ_LOG_HAS_LEVEL_TEXT; + log_decor |= PJ_LOG_HAS_SENDER; + pj_log_set_decor(log_decor); + + status = pjlib_util_init(); if (status != PJ_SUCCESS) { addon_log(L_DBG, "pj_lib_util_init failed\n"); return 1; } + pj_time_val now; + pj_gettimeofday(&now); + pj_srand((unsigned)now.sec); + pthread_mutex_init(&g_mutex, NULL); pj_log_set_level(0); - pj_caching_pool_init(&cp, &pj_pool_factory_default_policy, 0); + pj_caching_pool_init(&g_cp, &pj_pool_factory_default_policy, 0); char *sip_endpt_name = (char *)"mysip"; - status = pjsip_endpt_create(&cp.factory, sip_endpt_name, &g_sip_endpt); + status = pjsip_endpt_create(&g_cp.factory, sip_endpt_name, &g_sip_endpt); if (status != PJ_SUCCESS) { addon_log(L_DBG, "pjsip_endpt_create failed\n"); return 1; } - g_pool = pj_pool_create(&cp.factory, "tester", 1000, 1000, NULL); + g_pool = pj_pool_create(&g_cp.factory, "tester", 1000, 1000, NULL); /* Create event manager */ status = pjmedia_event_mgr_create(g_pool, 0, NULL); @@ -1351,10 +1369,10 @@ int __pjw_init() { return 1; } #if PJ_HAS_THREADS - status = pjmedia_endpt_create2(&cp.factory, NULL, 1, &g_med_endpt); + status = pjmedia_endpt_create2(&g_cp.factory, NULL, 1, &g_med_endpt); #else status = pjmedia_endpt_create2( - &cp.factory, pjsip_endpt_get_ioqueue(g_sip_endpt), 0, &g_med_endpt); + &g_cp.factory, pjsip_endpt_get_ioqueue(g_sip_endpt), 0, &g_med_endpt); #endif if (status != PJ_SUCCESS) { addon_log(L_DBG, "pjmedia_endpt_create failed\n"); @@ -1447,6 +1465,33 @@ int __pjw_init() { return 1; } + status = pj_timer_heap_create(g_pool, 128, &g_timer_heap); + if (status != PJ_SUCCESS) { + addon_log(L_DBG, "create timer heap error"); + return 1; + } + + + pj_websock_ssl_cert cert; + pj_bzero(&cert, sizeof(cert)); + cert.ca_file = pj_str(CERT_FILE); + cert.cert_file = pj_str(CERT_FILE); + cert.private_file = pj_str(CERT_KEY); + + pj_websock_endpt_cfg opt; + pj_websock_endpt_cfg_default(&opt); + opt.pf = &g_cp.factory; + opt.ioq = pjsip_endpt_get_ioqueue(g_sip_endpt); + opt.timer_heap = g_timer_heap; + opt.cert = &cert; + opt.async_cnt = 3; + + status = pj_websock_endpt_create(&opt, &g_ws_endpt); + if (status != PJ_SUCCESS) { + addon_log(L_DBG, "create websock endpoint error"); + return 1; + } + return 0; } @@ -3789,7 +3834,7 @@ pj_status_t audio_endpoint_start_speech_synth(Call *call, AudioEndpoint *ae, con return -1; } - if (!prepare_flite(call, ae, voice, end_of_speech_event)) { + if (!prepare_speech_synth(call, ae, voice, end_of_speech_event)) { return -1; } @@ -3936,7 +3981,7 @@ pj_status_t audio_endpoint_start_speech_recog(Call *call, AudioEndpoint *ae) { return -1; } - if (!prepare_pocketsphinx(call, ae)) { + if (!prepare_speech_recog(call, ae)) { return -1; } @@ -6838,7 +6883,7 @@ bool prepare_fax(Call *call, AudioEndpoint *ae, bool is_sender, const char *file return connect_feature_port_to_stream_port(call, ae, fp); } -bool prepare_flite(Call *call, AudioEndpoint *ae, const char *voice, bool end_of_speech_event) { +bool prepare_speech_synth(Call *call, AudioEndpoint *ae, const char *voice, bool end_of_speech_event) { pj_status_t status; ConfBridgePort *fp = &ae->feature_cbps[FP_SPEECH_SYNTH]; @@ -6879,7 +6924,7 @@ bool prepare_flite(Call *call, AudioEndpoint *ae, const char *voice, bool end_of return connect_feature_port_to_stream_port(call, ae, fp); } -bool prepare_pocketsphinx(Call *call, AudioEndpoint *ae) { +bool prepare_speech_recog(Call *call, AudioEndpoint *ae) { pj_status_t status; ConfBridgePort *fp = &ae->feature_cbps[FP_SPEECH_RECOG]; @@ -8354,7 +8399,7 @@ static int digit_buffer_thread(void *arg) { bool start_digit_buffer_thread() { pj_status_t status; pj_pool_t *pool = - pj_pool_create(&cp.factory, "digit_buffer_checker", 1000, 1000, NULL); + pj_pool_create(&g_cp.factory, "digit_buffer_checker", 1000, 1000, NULL); pj_thread_t *t; status = pj_thread_create(pool, "digit_buffer_checker", &digit_buffer_thread, NULL, 0, 0, &t);