Skip to content

Commit

Permalink
IO multiplexing for SCO thread
Browse files Browse the repository at this point in the history
This commit introduces IO multiplexing for read/write operations
performed in the SCO thread. From now on, every IO operation is
performed only when the corresponding file descriptor is ready for
particular IO operation. Also, it covers read/write error checking
in a more correct way.

For a simplification of a linear FIFO-like buffer usage, there is
a separate shared piece of code. Due to simplicity and the usage of
the preprocessor, this buffer wrapper should have essentially none
performance impact.
  • Loading branch information
arkq committed Sep 16, 2017
1 parent dae9e75 commit 9769323
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 33 deletions.
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ bin_PROGRAMS = bluealsa
SUBDIRS = asound

bluealsa_SOURCES = \
shared/ffb.c \
shared/log.c \
shared/rt.c \
at.c \
Expand Down
155 changes: 135 additions & 20 deletions src/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "bluealsa.h"
#include "transport.h"
#include "utils.h"
#include "shared/ffb.h"
#include "shared/log.h"
#include "shared/rt.h"

Expand Down Expand Up @@ -978,31 +979,53 @@ void *io_thread_sco(void *arg) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_push(CANCEL_ROUTINE(transport_pthread_cleanup), t);

/* this buffer has to be bigger than SCO MTU */
const size_t buffer_size = 512;
int16_t *buffer = malloc(buffer_size);
/* buffers for transferring data to and fro SCO socket */
struct ffb bt_in = { 0 };
struct ffb bt_out = { 0 };
pthread_cleanup_push(CANCEL_ROUTINE(ffb_free), &bt_in);
pthread_cleanup_push(CANCEL_ROUTINE(ffb_free), &bt_out);

pthread_cleanup_push(CANCEL_ROUTINE(free), buffer);

if (buffer == NULL) {
error("Couldn't create data buffers: %s", strerror(ENOMEM));
/* these buffers shall be bigger than the SCO MTU */
if (ffb_init(&bt_in, 128) == -1 || ffb_init(&bt_out, 128) == -1) {
error("Couldn't create data buffer: %s", strerror(ENOMEM));
goto fail;
}

struct asrsync asrs = { .frames = 0 };
struct pollfd pfds[] = {
{ t->event_fd, POLLIN, 0 },
/* SCO socket */
{ -1, POLLIN, 0 },
{ -1, POLLOUT, 0 },
/* PCM FIFO */
{ -1, POLLIN, 0 },
{ -1, POLLOUT, 0 },
};

debug("Starting IO loop: %s",
bluetooth_profile_to_string(t->profile, t->codec));
for (;;) {
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

pfds[1].fd = t->sco.mic_pcm.fd != -1 ? t->bt_fd : -1;
pfds[2].fd = t->sco.spk_pcm.fd;
/* fresh-start for file descriptors polling */
pfds[1].fd = pfds[2].fd = -1;
pfds[3].fd = pfds[4].fd = -1;

switch (t->codec) {
case HFP_CODEC_CVSD:
default:
if (ffb_len_in(&bt_in) >= t->mtu_read)
pfds[1].fd = t->bt_fd;
if (ffb_len_out(&bt_out) >= t->mtu_write)
pfds[2].fd = t->bt_fd;
if (ffb_len_in(&bt_out) >= t->mtu_write)
pfds[3].fd = t->sco.spk_pcm.fd;
if (ffb_len_out(&bt_in) >= t->mtu_read)
pfds[4].fd = t->sco.mic_pcm.fd;
}

if (t->sco.mic_pcm.fd == -1)
pfds[1].fd = -1;

if (poll(pfds, sizeof(pfds) / sizeof(*pfds), -1) == -1) {
error("Transport poll error: %s", strerror(errno));
Expand Down Expand Up @@ -1052,15 +1075,25 @@ void *io_thread_sco(void *arg) {
asrsync_init(asrs, transport_get_sampling(t));

if (pfds[1].revents & POLLIN) {
/* dispatch incoming SCO data */

uint8_t *buffer;
size_t buffer_len;
ssize_t len;

retry_sco:
switch (t->codec) {
case HFP_CODEC_CVSD:
default:
buffer = bt_in.tail;
buffer_len = ffb_len_in(&bt_in);
}

retry_sco_read:
errno = 0;
if ((len = read(pfds[1].fd, buffer, buffer_size)) <= 0)
if ((len = read(pfds[1].fd, buffer, buffer_len)) <= 0)
switch (errno) {
case EINTR:
goto retry_sco;
goto retry_sco_read;
case 0:
case ECONNABORTED:
case ECONNRESET:
Expand All @@ -1071,19 +1104,68 @@ void *io_thread_sco(void *arg) {
continue;
}

if (t->sco.mic_muted)
snd_pcm_scale_s16le(buffer, len / sizeof(int16_t), 1, 0, 0);
switch (t->codec) {
case HFP_CODEC_CVSD:
default:
ffb_seek(&bt_in, len);
}

write(t->sco.mic_pcm.fd, buffer, len);
}
else if (pfds[1].revents & (POLLERR | POLLHUP)) {
debug("SCO poll error status: 0x%x", pfds[1].revents);
transport_release_bt_sco(t);
}

if (pfds[2].revents & POLLIN) {
if (pfds[2].revents & POLLOUT) {
/* write-out SCO data */

uint8_t *buffer;
size_t buffer_len;
ssize_t len;

switch (t->codec) {
case HFP_CODEC_CVSD:
default:
buffer = bt_out.data;
buffer_len = t->mtu_write;
}

ssize_t samples = t->mtu_write / sizeof(int16_t);
retry_sco_write:
errno = 0;
if ((len = write(pfds[2].fd, buffer, buffer_len)) <= 0)
switch (errno) {
case EINTR:
goto retry_sco_write;
case 0:
case ECONNABORTED:
case ECONNRESET:
transport_release_bt_sco(t);
continue;
default:
error("SCO write error: %s", strerror(errno));
continue;
}

switch (t->codec) {
case HFP_CODEC_CVSD:
default:
ffb_rewind(&bt_out, len);
}

}

if (pfds[3].revents & POLLIN) {
/* dispatch incoming PCM data */

int16_t *buffer;
ssize_t samples;

switch (t->codec) {
case HFP_CODEC_CVSD:
default:
buffer = (int16_t *)bt_out.tail;
samples = ffb_len_in(&bt_out) / sizeof(int16_t);
}

/* read data from the FIFO - this function will block */
if ((samples = io_thread_read_pcm(&t->sco.spk_pcm, buffer, samples)) <= 0) {
Expand All @@ -1095,10 +1177,42 @@ void *io_thread_sco(void *arg) {
if (t->sco.spk_muted)
snd_pcm_scale_s16le(buffer, samples, 1, 0, 0);

write(t->bt_fd, buffer, samples * sizeof(int16_t));
switch (t->codec) {
case HFP_CODEC_CVSD:
default:
ffb_seek(&bt_out, samples * sizeof(int16_t));
}

}
else if (pfds[2].revents & (POLLERR | POLLHUP)) {
debug("PCM poll error status: 0x%x", pfds[2].revents);
else if (pfds[3].revents & (POLLERR | POLLHUP)) {
debug("PCM poll error status: 0x%x", pfds[3].revents);
}

if (pfds[4].revents & POLLOUT) {
/* write-out PCM data */

int16_t *buffer;
ssize_t samples;

switch (t->codec) {
case HFP_CODEC_CVSD:
default:
buffer = (int16_t *)bt_in.data;
samples = ffb_len_out(&bt_in) / sizeof(int16_t);
}

if (t->sco.mic_muted)
snd_pcm_scale_s16le(buffer, samples, 1, 0, 0);

if (io_thread_write_pcm(&t->sco.mic_pcm, buffer, samples) == -1)
error("FIFO write error: %s", strerror(errno));

switch (t->codec) {
case HFP_CODEC_CVSD:
default:
ffb_rewind(&bt_in, samples * sizeof(int16_t));
}

}

/* keep data transfer at a constant bit rate */
Expand All @@ -1111,5 +1225,6 @@ void *io_thread_sco(void *arg) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
return NULL;
}
41 changes: 41 additions & 0 deletions src/shared/ffb.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* BlueALSA - ffb.c
* Copyright (c) 2016-2017 Arkadiusz Bokowy
*
* This file is a part of bluez-alsa.
*
* This project is licensed under the terms of the MIT license.
*
*/

#include "shared/ffb.h"

#include <stdlib.h>


/**
* Allocated resources for FIFO-like buffer.
*
* @param ffb Pointer to the buffer structure which should be initialized.
* @param size Size of the buffer in bytes.
* @return On success this function returns 0. Otherwise, -1 is returned and
* errno is set to indicate the error. */
int ffb_init(struct ffb *ffb, size_t size) {
if ((ffb->data = ffb->tail = malloc(size)) == NULL)
return -1;
ffb->size = size;
return 0;
}

/**
* Free resources allocated by the ffb_init().
*
* @param ffb Pointer to initialized buffer structure. */
void ffb_free(struct ffb *ffb) {
if (ffb->data == NULL)
return;
free(ffb->data);
ffb->data = NULL;
ffb->tail = NULL;
ffb->size = 0;
}
40 changes: 40 additions & 0 deletions src/shared/ffb.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* BlueALSA - ffb.h
* Copyright (c) 2016-2017 Arkadiusz Bokowy
*
* This file is a part of bluez-alsa.
*
* This project is licensed under the terms of the MIT license.
*
*/

#ifndef BLUEALSA_SHARED_FFB_H_
#define BLUEALSA_SHARED_FFB_H_

#include <stddef.h>
#include <string.h>

/**
* Convenience wrapper for FIFO-like buffer. */
struct ffb {
/* pointer to the allocated memory block */
unsigned char *data;
/* pointer to the end of data */
unsigned char *tail;
/* size of the buffer */
size_t size;
};

int ffb_init(struct ffb *ffb, size_t size);
void ffb_free(struct ffb *ffb);

#define ffb_len_in(p) ((p)->size - ffb_len_out(p))
#define ffb_len_out(p) ((size_t)((p)->tail - (p)->data))

#define ffb_seek(p, len) ((p)->tail += len)
#define ffb_rewind(p, len) do { \
memmove((p)->data, (p)->data + (len), ffb_len_out(p) - (len)); \
(p)->tail -= len; \
} while (0)

#endif
9 changes: 5 additions & 4 deletions test/inc/test.inc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ static char test_error_msg[512];
static char test_warn_msg[512];
static char test_info_msg[512];

#define test_run(f) \
test_error_count = test_warn_count = test_info_count = 0; \
*test_error_msg = *test_warn_msg = *test_info_msg = '\0'; \
assert(!f())
#define test_run(f) do { \
test_error_count = test_warn_count = test_info_count = 0; \
*test_error_msg = *test_warn_msg = *test_info_msg = '\0'; \
assert(!f()); \
} while (0)

void error(const char *format, ...) {
va_list ap;
Expand Down
3 changes: 1 addition & 2 deletions test/test-at.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*
*/

#include <stdlib.h>
#include "inc/test.inc"
#include "../src/at.c"

Expand Down Expand Up @@ -105,5 +104,5 @@ int main(void) {
/* parse +CIND invalid response */
assert(at_parse_cind("(incorrect,1-2)", indmap) == -1);

return EXIT_SUCCESS;
return 0;
}
5 changes: 2 additions & 3 deletions test/test-io.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "../src/rfcomm.c"
#include "../src/transport.c"
#include "../src/utils.c"
#include "../src/shared/ffb.c"
#include "../src/shared/rt.c"

static const a2dp_sbc_t config_sbc_44100_joint_stereo = {
Expand Down Expand Up @@ -174,10 +175,8 @@ int test_a2dp_sbc_encoding(void) {
}

int main(void) {

test_run(test_a2dp_sbc_invalid_setup);
test_run(test_a2dp_sbc_decoding);
test_run(test_a2dp_sbc_encoding);

return EXIT_SUCCESS;
return 0;
}
1 change: 1 addition & 0 deletions test/test-pcm.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <alsa/asoundlib.h>
#include "inc/sine.inc"
#include "inc/test.inc"
#include "../src/shared/ffb.c"
#include "../src/shared/log.h"


Expand Down
3 changes: 2 additions & 1 deletion test/test-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
#define io_thread_a2dp_sink_sbc _io_thread_a2dp_sink_sbc
#define io_thread_a2dp_source_sbc _io_thread_a2dp_source_sbc
#include "../src/io.c"
#include "../src/rfcomm.c"
#undef io_thread_a2dp_sink_sbc
#undef io_thread_a2dp_source_sbc
#include "../src/rfcomm.c"
#define transport_acquire_bt_a2dp _transport_acquire_bt_a2dp
#include "../src/transport.c"
#undef transport_acquire_bt_a2dp
#include "../src/utils.c"
#include "../src/shared/ffb.c"
#include "../src/shared/rt.c"

static const a2dp_sbc_t cconfig = {
Expand Down
Loading

0 comments on commit 9769323

Please sign in to comment.