Skip to content

Commit

Permalink
Inverse priorities of new connections (#506)
Browse files Browse the repository at this point in the history
* Inverse priorities of new connections

* Apply formatting

---------

Co-authored-by: root <[email protected]>
  • Loading branch information
x4m and root authored Jul 12, 2023
1 parent 6ea14ee commit de02985
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 2 deletions.
7 changes: 7 additions & 0 deletions sources/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ static inline od_list_t *od_list_pop(od_list_t *list)
return pop;
}

static inline od_list_t *od_list_pop_back(od_list_t *list)
{
register od_list_t *pop = list->prev;
od_list_unlink(pop);
return pop;
}

static inline int od_list_empty(od_list_t *list)
{
return list->next == list && list->prev == list;
Expand Down
3 changes: 2 additions & 1 deletion sources/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ static inline void od_system_server(void *arg)
while (od_atomic_u32_of(&router->clients_routing) >=
(uint32_t)instance->config.client_max_routing) {
if (!warning_emitted) {
/* TODO: AB: Use WARNING here, it's not an error */
od_error(&instance->logger,
"client_max_routing", NULL, client,
"client_max_routing", client, NULL,
"client is waiting in routing queue");
warning_emitted = true;
}
Expand Down
4 changes: 3 additions & 1 deletion sources/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ static inline void od_worker(void *arg)

for (;;) {
machine_msg_t *msg;
msg = machine_channel_read(worker->task_channel, UINT32_MAX);
/* Inverse priorities of cliend routing to decrease chances of timeout */
msg = machine_channel_read_back(worker->task_channel,
UINT32_MAX);
if (msg == NULL)
break;

Expand Down
49 changes: 49 additions & 0 deletions third_party/machinarium/sources/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,52 @@ mm_msg_t *mm_channel_read(mm_channel_t *channel, uint32_t time_ms)

return reader.result;
}

mm_msg_t *mm_channel_read_back(mm_channel_t *channel, uint32_t time_ms)
{
/* try to get first message, if no other readers are
* waiting, otherwise put reader in the wait
* channel */
mm_sleeplock_lock(&channel->lock);

mm_list_t *next;
if ((channel->msg_list_count > 0) && (channel->readers_count == 0)) {
next = mm_list_pop_back(&channel->msg_list);
channel->msg_list_count--;
mm_sleeplock_unlock(&channel->lock);
return mm_container_of(next, mm_msg_t, link);
}

/* put reader into channel and register event */
mm_channelrd_t reader;
reader.result = NULL;
mm_list_init(&reader.link);
mm_eventmgr_add(&mm_self->event_mgr, &reader.event);

mm_list_append(&channel->readers, &reader.link);
channel->readers_count++;

mm_sleeplock_unlock(&channel->lock);

/* wait for cancel, timedout or writer event */
mm_eventmgr_wait(&mm_self->event_mgr, &reader.event, time_ms);

mm_sleeplock_lock(&channel->lock);

if (!reader.result) {
assert(channel->readers_count > 0);
channel->readers_count--;
mm_list_unlink(&reader.link);
}

mm_sleeplock_unlock(&channel->lock);

/* timedout or cancel */
if (reader.event.call.status != 0) {
if (reader.result)
mm_msg_unref(&mm_self->msg_cache, reader.result);
return NULL;
}

return reader.result;
}
1 change: 1 addition & 0 deletions third_party/machinarium/sources/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ void mm_channel_free(mm_channel_t *);
mm_retcode_t mm_channel_write(mm_channel_t *, mm_msg_t *);

mm_msg_t *mm_channel_read(mm_channel_t *, uint32_t);
mm_msg_t *mm_channel_read_back(mm_channel_t *, uint32_t);

#endif /* MM_CHANNEL_H */
19 changes: 19 additions & 0 deletions third_party/machinarium/sources/channel_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,22 @@ MACHINE_API machine_msg_t *machine_channel_read(machine_channel_t *obj,
msg = mm_channelfast_read(channel, time_ms);
return (machine_msg_t *)msg;
}

MACHINE_API machine_msg_t *machine_channel_read_back(machine_channel_t *obj,
uint32_t time_ms)
{
mm_channeltype_t *type;
type = mm_cast(mm_channeltype_t *, obj);
if (type->is_shared) {
mm_channel_t *channel;
channel = mm_cast(mm_channel_t *, obj);
mm_msg_t *msg;
msg = mm_channel_read_back(channel, time_ms);
return (machine_msg_t *)msg;
}
mm_channelfast_t *channel;
channel = mm_cast(mm_channelfast_t *, obj);
mm_msg_t *msg;
msg = mm_channelfast_read(channel, time_ms);
return (machine_msg_t *)msg;
}
7 changes: 7 additions & 0 deletions third_party/machinarium/sources/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ static inline mm_list_t *mm_list_pop(mm_list_t *list)
return pop;
}

static inline mm_list_t *mm_list_pop_back(mm_list_t *list)
{
register mm_list_t *pop = list->prev;
mm_list_unlink(pop);
return pop;
}

#define mm_list_foreach(H, I) for (I = (H)->next; I != H; I = (I)->next)

#define mm_list_foreach_safe(H, I, N) \
Expand Down
3 changes: 3 additions & 0 deletions third_party/machinarium/sources/machinarium.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ MACHINE_API mm_retcode_t machine_channel_write(machine_channel_t *,
MACHINE_API machine_msg_t *machine_channel_read(machine_channel_t *,
uint32_t time_ms);

MACHINE_API machine_msg_t *machine_channel_read_back(machine_channel_t *,
uint32_t time_ms);

/* tls */

MACHINE_API machine_tls_t *machine_tls_create(void);
Expand Down

0 comments on commit de02985

Please sign in to comment.