From 9769323899cfbaf21c1d8710eaa997841b127095 Mon Sep 17 00:00:00 2001 From: Arkadiusz Bokowy Date: Sat, 16 Sep 2017 17:22:47 +0200 Subject: [PATCH] IO multiplexing for SCO thread This commit introduces IO multiplexing for read/write operations performed in the SCO thread. From now on, every IO operation is performed only when the corresponding file descriptor is ready for particular IO operation. Also, it covers read/write error checking in a more correct way. For a simplification of a linear FIFO-like buffer usage, there is a separate shared piece of code. Due to simplicity and the usage of the preprocessor, this buffer wrapper should have essentially none performance impact. --- src/Makefile.am | 1 + src/io.c | 155 +++++++++++++++++++++++++++++++++++++++------ src/shared/ffb.c | 41 ++++++++++++ src/shared/ffb.h | 40 ++++++++++++ test/inc/test.inc | 9 +-- test/test-at.c | 3 +- test/test-io.c | 5 +- test/test-pcm.c | 1 + test/test-server.c | 3 +- test/test-utils.c | 30 ++++++++- 10 files changed, 255 insertions(+), 33 deletions(-) create mode 100644 src/shared/ffb.c create mode 100644 src/shared/ffb.h diff --git a/src/Makefile.am b/src/Makefile.am index 99ebf57c9..b7c605f1a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -5,6 +5,7 @@ bin_PROGRAMS = bluealsa SUBDIRS = asound bluealsa_SOURCES = \ + shared/ffb.c \ shared/log.c \ shared/rt.c \ at.c \ diff --git a/src/io.c b/src/io.c index d211d5ad1..675778b12 100644 --- a/src/io.c +++ b/src/io.c @@ -36,6 +36,7 @@ #include "bluealsa.h" #include "transport.h" #include "utils.h" +#include "shared/ffb.h" #include "shared/log.h" #include "shared/rt.h" @@ -978,22 +979,27 @@ void *io_thread_sco(void *arg) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_push(CANCEL_ROUTINE(transport_pthread_cleanup), t); - /* this buffer has to be bigger than SCO MTU */ - const size_t buffer_size = 512; - int16_t *buffer = malloc(buffer_size); + /* buffers for transferring data to and fro SCO socket */ + struct ffb bt_in = { 0 }; + struct ffb bt_out = { 0 }; + pthread_cleanup_push(CANCEL_ROUTINE(ffb_free), &bt_in); + pthread_cleanup_push(CANCEL_ROUTINE(ffb_free), &bt_out); - pthread_cleanup_push(CANCEL_ROUTINE(free), buffer); - - if (buffer == NULL) { - error("Couldn't create data buffers: %s", strerror(ENOMEM)); + /* these buffers shall be bigger than the SCO MTU */ + if (ffb_init(&bt_in, 128) == -1 || ffb_init(&bt_out, 128) == -1) { + error("Couldn't create data buffer: %s", strerror(ENOMEM)); goto fail; } struct asrsync asrs = { .frames = 0 }; struct pollfd pfds[] = { { t->event_fd, POLLIN, 0 }, + /* SCO socket */ { -1, POLLIN, 0 }, + { -1, POLLOUT, 0 }, + /* PCM FIFO */ { -1, POLLIN, 0 }, + { -1, POLLOUT, 0 }, }; debug("Starting IO loop: %s", @@ -1001,8 +1007,25 @@ void *io_thread_sco(void *arg) { for (;;) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pfds[1].fd = t->sco.mic_pcm.fd != -1 ? t->bt_fd : -1; - pfds[2].fd = t->sco.spk_pcm.fd; + /* fresh-start for file descriptors polling */ + pfds[1].fd = pfds[2].fd = -1; + pfds[3].fd = pfds[4].fd = -1; + + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + if (ffb_len_in(&bt_in) >= t->mtu_read) + pfds[1].fd = t->bt_fd; + if (ffb_len_out(&bt_out) >= t->mtu_write) + pfds[2].fd = t->bt_fd; + if (ffb_len_in(&bt_out) >= t->mtu_write) + pfds[3].fd = t->sco.spk_pcm.fd; + if (ffb_len_out(&bt_in) >= t->mtu_read) + pfds[4].fd = t->sco.mic_pcm.fd; + } + + if (t->sco.mic_pcm.fd == -1) + pfds[1].fd = -1; if (poll(pfds, sizeof(pfds) / sizeof(*pfds), -1) == -1) { error("Transport poll error: %s", strerror(errno)); @@ -1052,15 +1075,25 @@ void *io_thread_sco(void *arg) { asrsync_init(asrs, transport_get_sampling(t)); if (pfds[1].revents & POLLIN) { + /* dispatch incoming SCO data */ + uint8_t *buffer; + size_t buffer_len; ssize_t len; -retry_sco: + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + buffer = bt_in.tail; + buffer_len = ffb_len_in(&bt_in); + } + +retry_sco_read: errno = 0; - if ((len = read(pfds[1].fd, buffer, buffer_size)) <= 0) + if ((len = read(pfds[1].fd, buffer, buffer_len)) <= 0) switch (errno) { case EINTR: - goto retry_sco; + goto retry_sco_read; case 0: case ECONNABORTED: case ECONNRESET: @@ -1071,19 +1104,68 @@ void *io_thread_sco(void *arg) { continue; } - if (t->sco.mic_muted) - snd_pcm_scale_s16le(buffer, len / sizeof(int16_t), 1, 0, 0); + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + ffb_seek(&bt_in, len); + } - write(t->sco.mic_pcm.fd, buffer, len); } else if (pfds[1].revents & (POLLERR | POLLHUP)) { debug("SCO poll error status: 0x%x", pfds[1].revents); transport_release_bt_sco(t); } - if (pfds[2].revents & POLLIN) { + if (pfds[2].revents & POLLOUT) { + /* write-out SCO data */ + + uint8_t *buffer; + size_t buffer_len; + ssize_t len; + + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + buffer = bt_out.data; + buffer_len = t->mtu_write; + } - ssize_t samples = t->mtu_write / sizeof(int16_t); +retry_sco_write: + errno = 0; + if ((len = write(pfds[2].fd, buffer, buffer_len)) <= 0) + switch (errno) { + case EINTR: + goto retry_sco_write; + case 0: + case ECONNABORTED: + case ECONNRESET: + transport_release_bt_sco(t); + continue; + default: + error("SCO write error: %s", strerror(errno)); + continue; + } + + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + ffb_rewind(&bt_out, len); + } + + } + + if (pfds[3].revents & POLLIN) { + /* dispatch incoming PCM data */ + + int16_t *buffer; + ssize_t samples; + + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + buffer = (int16_t *)bt_out.tail; + samples = ffb_len_in(&bt_out) / sizeof(int16_t); + } /* read data from the FIFO - this function will block */ if ((samples = io_thread_read_pcm(&t->sco.spk_pcm, buffer, samples)) <= 0) { @@ -1095,10 +1177,42 @@ void *io_thread_sco(void *arg) { if (t->sco.spk_muted) snd_pcm_scale_s16le(buffer, samples, 1, 0, 0); - write(t->bt_fd, buffer, samples * sizeof(int16_t)); + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + ffb_seek(&bt_out, samples * sizeof(int16_t)); + } + } - else if (pfds[2].revents & (POLLERR | POLLHUP)) { - debug("PCM poll error status: 0x%x", pfds[2].revents); + else if (pfds[3].revents & (POLLERR | POLLHUP)) { + debug("PCM poll error status: 0x%x", pfds[3].revents); + } + + if (pfds[4].revents & POLLOUT) { + /* write-out PCM data */ + + int16_t *buffer; + ssize_t samples; + + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + buffer = (int16_t *)bt_in.data; + samples = ffb_len_out(&bt_in) / sizeof(int16_t); + } + + if (t->sco.mic_muted) + snd_pcm_scale_s16le(buffer, samples, 1, 0, 0); + + if (io_thread_write_pcm(&t->sco.mic_pcm, buffer, samples) == -1) + error("FIFO write error: %s", strerror(errno)); + + switch (t->codec) { + case HFP_CODEC_CVSD: + default: + ffb_rewind(&bt_in, samples * sizeof(int16_t)); + } + } /* keep data transfer at a constant bit rate */ @@ -1111,5 +1225,6 @@ void *io_thread_sco(void *arg) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(1); pthread_cleanup_pop(1); + pthread_cleanup_pop(1); return NULL; } diff --git a/src/shared/ffb.c b/src/shared/ffb.c new file mode 100644 index 000000000..13bb6125e --- /dev/null +++ b/src/shared/ffb.c @@ -0,0 +1,41 @@ +/* + * BlueALSA - ffb.c + * Copyright (c) 2016-2017 Arkadiusz Bokowy + * + * This file is a part of bluez-alsa. + * + * This project is licensed under the terms of the MIT license. + * + */ + +#include "shared/ffb.h" + +#include + + +/** + * Allocated resources for FIFO-like buffer. + * + * @param ffb Pointer to the buffer structure which should be initialized. + * @param size Size of the buffer in bytes. + * @return On success this function returns 0. Otherwise, -1 is returned and + * errno is set to indicate the error. */ +int ffb_init(struct ffb *ffb, size_t size) { + if ((ffb->data = ffb->tail = malloc(size)) == NULL) + return -1; + ffb->size = size; + return 0; +} + +/** + * Free resources allocated by the ffb_init(). + * + * @param ffb Pointer to initialized buffer structure. */ +void ffb_free(struct ffb *ffb) { + if (ffb->data == NULL) + return; + free(ffb->data); + ffb->data = NULL; + ffb->tail = NULL; + ffb->size = 0; +} diff --git a/src/shared/ffb.h b/src/shared/ffb.h new file mode 100644 index 000000000..9021d6142 --- /dev/null +++ b/src/shared/ffb.h @@ -0,0 +1,40 @@ +/* + * BlueALSA - ffb.h + * Copyright (c) 2016-2017 Arkadiusz Bokowy + * + * This file is a part of bluez-alsa. + * + * This project is licensed under the terms of the MIT license. + * + */ + +#ifndef BLUEALSA_SHARED_FFB_H_ +#define BLUEALSA_SHARED_FFB_H_ + +#include +#include + +/** + * Convenience wrapper for FIFO-like buffer. */ +struct ffb { + /* pointer to the allocated memory block */ + unsigned char *data; + /* pointer to the end of data */ + unsigned char *tail; + /* size of the buffer */ + size_t size; +}; + +int ffb_init(struct ffb *ffb, size_t size); +void ffb_free(struct ffb *ffb); + +#define ffb_len_in(p) ((p)->size - ffb_len_out(p)) +#define ffb_len_out(p) ((size_t)((p)->tail - (p)->data)) + +#define ffb_seek(p, len) ((p)->tail += len) +#define ffb_rewind(p, len) do { \ + memmove((p)->data, (p)->data + (len), ffb_len_out(p) - (len)); \ + (p)->tail -= len; \ + } while (0) + +#endif diff --git a/test/inc/test.inc b/test/inc/test.inc index 98c86b1ad..b2a5cba7d 100644 --- a/test/inc/test.inc +++ b/test/inc/test.inc @@ -25,10 +25,11 @@ static char test_error_msg[512]; static char test_warn_msg[512]; static char test_info_msg[512]; -#define test_run(f) \ - test_error_count = test_warn_count = test_info_count = 0; \ - *test_error_msg = *test_warn_msg = *test_info_msg = '\0'; \ - assert(!f()) +#define test_run(f) do { \ + test_error_count = test_warn_count = test_info_count = 0; \ + *test_error_msg = *test_warn_msg = *test_info_msg = '\0'; \ + assert(!f()); \ + } while (0) void error(const char *format, ...) { va_list ap; diff --git a/test/test-at.c b/test/test-at.c index 8a0fc8c5b..88edfd90f 100644 --- a/test/test-at.c +++ b/test/test-at.c @@ -8,7 +8,6 @@ * */ -#include #include "inc/test.inc" #include "../src/at.c" @@ -105,5 +104,5 @@ int main(void) { /* parse +CIND invalid response */ assert(at_parse_cind("(incorrect,1-2)", indmap) == -1); - return EXIT_SUCCESS; + return 0; } diff --git a/test/test-io.c b/test/test-io.c index 4cc238024..d4be542ee 100644 --- a/test/test-io.c +++ b/test/test-io.c @@ -18,6 +18,7 @@ #include "../src/rfcomm.c" #include "../src/transport.c" #include "../src/utils.c" +#include "../src/shared/ffb.c" #include "../src/shared/rt.c" static const a2dp_sbc_t config_sbc_44100_joint_stereo = { @@ -174,10 +175,8 @@ int test_a2dp_sbc_encoding(void) { } int main(void) { - test_run(test_a2dp_sbc_invalid_setup); test_run(test_a2dp_sbc_decoding); test_run(test_a2dp_sbc_encoding); - - return EXIT_SUCCESS; + return 0; } diff --git a/test/test-pcm.c b/test/test-pcm.c index 05d06f738..bf558978f 100644 --- a/test/test-pcm.c +++ b/test/test-pcm.c @@ -16,6 +16,7 @@ #include #include "inc/sine.inc" #include "inc/test.inc" +#include "../src/shared/ffb.c" #include "../src/shared/log.h" diff --git a/test/test-server.c b/test/test-server.c index f4a617834..3f1dd0dbf 100644 --- a/test/test-server.c +++ b/test/test-server.c @@ -28,13 +28,14 @@ #define io_thread_a2dp_sink_sbc _io_thread_a2dp_sink_sbc #define io_thread_a2dp_source_sbc _io_thread_a2dp_source_sbc #include "../src/io.c" -#include "../src/rfcomm.c" #undef io_thread_a2dp_sink_sbc #undef io_thread_a2dp_source_sbc +#include "../src/rfcomm.c" #define transport_acquire_bt_a2dp _transport_acquire_bt_a2dp #include "../src/transport.c" #undef transport_acquire_bt_a2dp #include "../src/utils.c" +#include "../src/shared/ffb.c" #include "../src/shared/rt.c" static const a2dp_sbc_t cconfig = { diff --git a/test/test-utils.c b/test/test-utils.c index 5f5282f44..61335b3d5 100644 --- a/test/test-utils.c +++ b/test/test-utils.c @@ -10,6 +10,7 @@ #include "inc/test.inc" #include "../src/utils.c" +#include "../src/shared/ffb.c" #include "../src/shared/rt.c" int test_dbus_profile_object_path(void) { @@ -127,11 +128,34 @@ int test_difftimespec(void) { return 0; } -int main(void) { +int test_fifo_buffer(void) { + + struct ffb ffb = { 0 }; + + assert(ffb_init(&ffb, 64) == 0); + assert(ffb.data == ffb.tail); + assert(ffb.size == 64); + + memcpy(ffb.data, "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ", 36); + ffb_seek(&ffb, 36); + + assert(ffb_len_in(&ffb) == 64 - 36); + assert(ffb_len_out(&ffb) == 36); + assert(ffb.tail[-1] == 'Z'); + + ffb_rewind(&ffb, 15); + assert(ffb_len_in(&ffb) == 64 - (36 - 15)); + assert(ffb_len_out(&ffb) == 36 - 15); + assert(memcmp(ffb.data, "FGHIJKLMNOPQRSTUVWXYZ", ffb_len_out(&ffb)) == 0); + assert(ffb.tail[-1] == 'Z'); + return 0; +} + +int main(void) { test_run(test_dbus_profile_object_path); test_run(test_pcm_scale_s16le); test_run(test_difftimespec); - - return EXIT_SUCCESS; + test_run(test_fifo_buffer); + return 0; }