Skip to content

Commit

Permalink
new with with additional mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
catkira committed Nov 7, 2024
1 parent 7f4bfcb commit 6819776
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 12 deletions.
7 changes: 7 additions & 0 deletions buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,23 @@ void iio_buffer_destroy(struct iio_buffer *buf)
{
const struct iio_backend_ops *ops = buf->dev->ctx->ops;

printf("iio_buffer_cancel...\n");
iio_buffer_cancel(buf);
printf("iio_buffer_cancel done\n");

printf("free_buffer...\n");
if (ops->free_buffer)
ops->free_buffer(buf->pdata);
printf("free_buffer done\n");

printf("iio_task_destroy...\n");
iio_task_destroy(buf->worker);
printf("iio_task_destroy done\n");
iio_mutex_destroy(buf->lock);
iio_channels_mask_destroy(buf->mask);
free(buf->attrlist.attrs);
free(buf);
printf("iio_buffer_destroy done\n");
}

const struct iio_channels_mask *
Expand Down
5 changes: 5 additions & 0 deletions iiod-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,15 @@ struct iiod_client * iiod_client_new(const struct iio_context_params *params,
void iiod_client_destroy(struct iiod_client *client)
{
if (client->responder) {
printf("iiod_client_destroy1\n");
iiod_client_cancel(client);
printf("iiod_client_destroy2\n");
iiod_responder_destroy(client->responder);
printf("iiod_client_destroy3\n");
}

iio_mutex_destroy(client->lock);
printf("iiod_client_destroy4\n");
free(client);
}

Expand Down Expand Up @@ -510,6 +514,7 @@ static ssize_t iiod_client_read_attr_new(struct iiod_client *client,
char *dest, size_t len)
{
struct iiod_io *io = iiod_responder_get_default_io(client->responder);
printf("* thread = %u, iiod_responder_get_default_io = %d\n", pthread_self(), io);
const struct iio_channel *chn;
const struct iio_device *dev;
const struct iio_buffer *buf;
Expand Down
89 changes: 80 additions & 9 deletions iiod-responder.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct iiod_io {
/* Cond to sleep until I/O is done */
struct iio_cond *cond;
struct iio_mutex *lock;
struct iio_mutex *inuse_lock;

/* Set to true when the response has been read */
bool r_done;
Expand Down Expand Up @@ -114,6 +115,16 @@ static void __iiod_io_cancel_unlocked(struct iiod_io *io)
struct iiod_io *tmp;

/* Discard the entry from the readers list */
printf("thread = %u, discard reader %d\n", pthread_self(), io);
if (priv->readers) {
for (tmp = priv->readers; tmp; ) {
if (tmp == tmp->r_next) {
printf("loop detected!!! %d -> %d\n", tmp, tmp->r_next);
exit(1);
}
tmp = tmp->r_next;
}
}
if (io == priv->readers) {
priv->readers = io->r_next;
} else if (priv->readers) {
Expand Down Expand Up @@ -154,8 +165,11 @@ static ssize_t iiod_rw_all(struct iiod_responder *priv,
nb = 1;
}

if (is_read)
if (is_read) {
printf("thread = %u, read...\n", pthread_self());
ret = priv->ops->read(priv->d, curr, nb);
printf("thread = %u, read done\n", pthread_self());
}
else
ret = priv->ops->write(priv->d, curr, nb);
if (ret <= 0)
Expand Down Expand Up @@ -224,6 +238,7 @@ static void iiod_responder_signal_io(struct iiod_io *io, int32_t code)
io->r_done = true;
iio_cond_signal(io->cond);
iio_mutex_unlock(io->lock);
iio_mutex_unlock(io->inuse_lock);
}

static void iiod_responder_cancel_responses(struct iiod_responder *priv)
Expand All @@ -250,6 +265,7 @@ static int iiod_responder_reader_worker(struct iiod_responder *priv)
ok_buf.size = 3;

iio_mutex_lock(priv->lock);
printf("thread = %u, start reader...\n", pthread_self());

while (!priv->thrd_stop) {
iio_mutex_unlock(priv->lock);
Expand All @@ -268,7 +284,9 @@ static int iiod_responder_reader_worker(struct iiod_responder *priv)
continue;
}

printf("thread = %u, try lock...\n", pthread_self());
iio_mutex_lock(priv->lock);
printf("thread = %u, locked\n", pthread_self());
if (ret <= 0)
break;

Expand All @@ -277,7 +295,10 @@ static int iiod_responder_reader_worker(struct iiod_responder *priv)

ret = iiod_run_command(priv, &cmd);

printf("thread = %u, try lock...\n", pthread_self());
iio_mutex_lock(priv->lock);
printf("thread = %u, locked\n", pthread_self());

if (ret < 0)
break;

Expand All @@ -286,16 +307,21 @@ static int iiod_responder_reader_worker(struct iiod_responder *priv)

/* Find the client for the given ID in the readers list */
for (io = priv->readers; io; io = io->r_next) {
if (io->client_id == cmd.client_id)
if (io->client_id == cmd.client_id) {
printf("* thread = %u, selecting io %u with client_id %d\n", pthread_self(), io, io->client_id);
break;
}
}

if (!io) {
/* We received a response, but have no client waiting
* for it, so drop it. */
iio_mutex_unlock(priv->lock);
iiod_discard_data(priv, cmd.code);
printf("thread = %u, try lock...\n", pthread_self());
iio_mutex_lock(priv->lock);
printf("thread = %u, locked\n", pthread_self());

continue;
}

Expand Down Expand Up @@ -328,6 +354,7 @@ static int iiod_responder_reader_worker(struct iiod_responder *priv)
iiod_responder_signal_io(io, cmd.code);
iiod_io_unref_unlocked(io);
}
printf("thread = %u, stop reader...\n", pthread_self());

priv->thrd_err_code = priv->thrd_stop ? -EINTR : (int) ret;
priv->thrd_stop = true;
Expand Down Expand Up @@ -488,6 +515,7 @@ int32_t iiod_io_wait_for_response(struct iiod_io *io)

io->r_io.cmd.code = ret;
io->r_done = true;
iio_mutex_unlock(io->inuse_lock);
break;
}
}
Expand Down Expand Up @@ -557,6 +585,7 @@ int iiod_io_get_response_async(struct iiod_io *io,
return priv->thrd_err_code;
}

iio_mutex_lock(io->inuse_lock);
if (nb)
memcpy(io->r_io.buf, buf, sizeof(*buf) * nb);
io->r_io.nb_buf = nb;
Expand All @@ -565,14 +594,37 @@ int iiod_io_get_response_async(struct iiod_io *io,
io->r_io.start_time = read_counter_us();

/* Add it to the readers list */
if (!priv->readers) {
priv->readers = io;
} else {
for (tmp = priv->readers; tmp->r_next; )
printf("thread = %u, adding reader %d\n", pthread_self(), io);
bool found = false;
if (priv->readers) {
for (tmp = priv->readers; tmp; ) {
if (tmp == io)
{
printf("reader is already in list!!\n");
found = true;
}
if (tmp == tmp->r_next) {
printf("loop detected!!!\n");
}
tmp = tmp->r_next;
tmp->r_next = io;
io->r_next = NULL;
}
}
if (!found) {
if (!priv->readers) {
priv->readers = io;
} else {
unsigned int i = 0;
for (tmp = priv->readers; tmp->r_next; ) {
printf("list item %d = %d\n", i++,tmp);
tmp = tmp->r_next;
if (i > 100) exit(1);
}
tmp->r_next = io;
printf("io->r_next = %d\n", io->r_next);
// io->r_next = NULL;
}
}
else exit(1);

iio_mutex_unlock(priv->lock);

Expand Down Expand Up @@ -623,6 +675,11 @@ iiod_responder_create_io(struct iiod_responder *priv, uint16_t id)
if (err)
goto err_free_cond;

io->inuse_lock = iio_mutex_create();
err = iio_err(io->inuse_lock);
if (err)
goto err_free_cond;

io->client_id = id;

return io;
Expand Down Expand Up @@ -706,21 +763,35 @@ void iiod_responder_stop(struct iiod_responder *priv)

void iiod_responder_destroy(struct iiod_responder *priv)
{
printf("thread = %u, iiod_responder_destroy1\n", pthread_self());
iiod_responder_stop(priv);
printf("iiod_responder_destroy2\n");
iiod_responder_wait_done(priv);

printf("iiod_responder_destroy3\n");
iio_task_destroy(priv->write_task);
printf("iiod_responder_destroy4\n");

iiod_io_unref(priv->default_io);
printf("iiod_responder_destroy5\n");
iio_mutex_destroy(priv->lock);
free(priv);
}

struct iio_thrd {
pthread_t thid;
void *d;
int (*func)(void *);
};

void iiod_responder_wait_done(struct iiod_responder *priv)
{
if (!NO_THREADS) {
if (priv->read_thrd)
if (priv->read_thrd) {
printf("thread = %u, iio_thrd_join_and_destroy thread = %u...\n", pthread_self(), priv->read_thrd->thid);
iio_thrd_join_and_destroy(priv->read_thrd);
printf("iio_thrd_join_and_destroy done\n ");
}
priv->read_thrd = NULL;
} else if (!priv->thrd_stop) {
iiod_responder_reader_worker(priv);
Expand Down
1 change: 1 addition & 0 deletions local.c
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,7 @@ local_create_buffer(const struct iio_device *dev, unsigned int idx,

static void local_free_buffer(struct iio_buffer_pdata *pdata)
{
printf("local_free_buffer\n");
free(pdata->pdata);
local_close_fd(pdata->dev, pdata->fd);
close(pdata->cancel_fd);
Expand Down
10 changes: 8 additions & 2 deletions network-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static int create_cancel_fd(struct iiod_client_pdata *io_ctx)
io_ctx->cancel_fd[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (io_ctx->cancel_fd[0] < 0)
return -errno;
printf("created cancel_fd %d\n", io_ctx->cancel_fd[0]);
return 0;
}

Expand Down Expand Up @@ -76,6 +77,7 @@ static int create_cancel_fd(struct iiod_client_pdata *io_ctx)

void cleanup_cancel(struct iiod_client_pdata *io_ctx)
{
printf("cleanup_cancel\n");
close(io_ctx->cancel_fd[0]);
if (!WITH_NETWORK_EVENTFD)
close(io_ctx->cancel_fd[1]);
Expand All @@ -90,15 +92,17 @@ int setup_cancel(struct iiod_client_pdata *io_ctx)

void do_cancel(struct iiod_client_pdata *io_ctx)
{
printf("do cancel... fd = %d\n", io_ctx->cancel_fd[0]);
uint64_t event = 1;
int ret;

ret = write(io_ctx->cancel_fd[CANCEL_WR_FD], &event, sizeof(event));
if (ret == -1) {
/* If this happens something went very seriously wrong */
prm_perror(io_ctx->params, -errno,
"Unable to signal cancellation event");
printf("do cancel error!\n");
}
printf("do cancel done\n");
}

int wait_cancellable(struct iiod_client_pdata *io_ctx,
Expand All @@ -116,12 +120,14 @@ int wait_cancellable(struct iiod_client_pdata *io_ctx,
else
pfd[0].events = POLLOUT;
pfd[1].fd = io_ctx->cancel_fd[0];
pfd[1].events = POLLIN;
pfd[1].events = POLLIN | POLLPRI | POLLHUP | POLLERR;
printf("thread = %u, poll cancel_fd %d\n", pthread_self(), pfd[1].fd);

do {
do {
ret = poll(pfd, 2, timeout);
} while (ret == -1 && errno == EINTR);
printf("thread = %u, pfd[0].revents = %d, pfd[1].revents = %d, ret = %d\n",pthread_self(), pfd[0].revents,pfd[1].revents, ret);

if (ret == -1)
return -errno;
Expand Down
10 changes: 10 additions & 0 deletions network.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ static ssize_t network_recv(struct iiod_client_pdata *io_ctx, void *data,
#ifdef __linux__
cancellable &= !(flags & MSG_DONTWAIT);
#endif
printf("thread = %u, network_recv cancellable = %d\n", pthread_self(), cancellable);

while (1) {
printf("thread = %u, wait_cancellable...\n", pthread_self());
if (cancellable) {
ret = wait_cancellable(io_ctx, true, timeout_ms);
if (ret < 0)
return ret;
}
printf("thread = %u, wait_cancellable done\n", pthread_self());

ret = recv(io_ctx->fd, data, (int) len, flags);
if (ret == 0)
Expand All @@ -119,6 +122,7 @@ static ssize_t network_recv(struct iiod_client_pdata *io_ctx, void *data,
return (ssize_t) err;
}
}
printf("thread = %u, network_recv done\n", pthread_self());
return ret;
}

Expand Down Expand Up @@ -362,8 +366,11 @@ network_setup_iiod_client(const struct iio_device *dev,
static void network_free_iiod_client(struct iiod_client *client,
struct iiod_client_pdata *io_ctx)
{
printf("network_free_iiod_client1 cancel_fd = %d\n", io_ctx->cancel_fd[0]);
iiod_client_destroy(client);
printf("network_free_iiod_client2\n");
cleanup_cancel(io_ctx);
printf("network_free_iiod_client3\n");
close(io_ctx->fd);
io_ctx->fd = -1;
}
Expand Down Expand Up @@ -480,8 +487,11 @@ network_create_buffer(const struct iio_device *dev, unsigned int idx,

void network_free_buffer(struct iio_buffer_pdata *pdata)
{
printf("thread = %u, free1 fd = %d\n", pthread_self(), pdata->io_ctx.cancel_fd[0]);
iiod_client_free_buffer(pdata->pdata);
printf("thread = %u, free2\n", pthread_self());
network_free_iiod_client(pdata->iiod_client, &pdata->io_ctx);
printf("thread = %u, free3\n", pthread_self());
free(pdata);
}

Expand Down
Loading

0 comments on commit 6819776

Please sign in to comment.