Skip to content

Commit

Permalink
PROTON-2546: Rearrange how raw connection generates events
Browse files Browse the repository at this point in the history
Also Implement a proactor forced cleanup for raw connections to try to
limit leaking of events on forced shutdowns.
  • Loading branch information
astitcher committed May 27, 2022
1 parent e23bcf4 commit e2fa36f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 1 deletion.
2 changes: 2 additions & 0 deletions c/src/proactor/epoll-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,11 @@ psocket_t *pni_task_raw_psocket(task_t *t);
pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready);

typedef struct praw_connection_t praw_connection_t;
praw_connection_t *pni_task_raw_connection(task_t *t);
task_t *pni_raw_connection_task(praw_connection_t *rc);
praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch);
void pni_raw_connection_done(praw_connection_t *rc);
void pni_raw_connection_forced_shutdown(praw_connection_t *rc);

pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c);
void pni_timer_free(pni_timer_t *timer);
Expand Down
3 changes: 3 additions & 0 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,9 @@ void pn_proactor_free(pn_proactor_t *p) {
case LISTENER:
listener_forced_shutdown(task_listener(tsk));
break;
case RAW_CONNECTION:
pni_raw_connection_forced_shutdown(pni_task_raw_connection(tsk));
break;
default:
break;
}
Expand Down
9 changes: 9 additions & 0 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ task_t *pni_psocket_raw_task(psocket_t* ps) {
return &containerof(ps, praw_connection_t, psocket)->task;
}

praw_connection_t *pni_task_raw_connection(task_t *t) {
return containerof(t, praw_connection_t, task);
}

psocket_t *pni_task_raw_psocket(task_t *t) {
return &containerof(t, praw_connection_t, task)->psocket;
}
Expand Down Expand Up @@ -433,3 +437,8 @@ void pni_raw_connection_done(praw_connection_t *rc) {
if (notify) notify_poller(p);
if (resume_thread) pni_resume(p, resume_thread);
}

void pni_raw_connection_forced_shutdown(praw_connection_t *rc) {
pni_raw_finalize(&rc->raw_connection);
praw_connection_cleanup(rc);
}
1 change: 1 addition & 0 deletions c/src/proactor/raw_connection-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct pn_raw_connection_t {
bool rrequestedbuffers;
bool wrequestedbuffers;

bool connectpending;
bool rpending;
bool wpending;
bool rclosedpending;
Expand Down
5 changes: 4 additions & 1 deletion c/src/proactor/raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ static inline void pni_raw_disconnect(pn_raw_connection_t *conn) {

void pni_raw_connected(pn_raw_connection_t *conn) {
pn_condition_clear(conn->condition);
pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
conn->connectpending = true;
conn->state = pni_raw_new_state(conn, conn_connected);
}

Expand Down Expand Up @@ -665,6 +665,9 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
pn_event_t *event = pn_collector_next(conn->collector);
if (event) {
return pni_log_event(conn, event);
} else if (conn->connectpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
conn->connectpending = false;
} else if (conn->wakepending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
conn->wakepending = false;
Expand Down

0 comments on commit e2fa36f

Please sign in to comment.