Skip to content

Commit

Permalink
Prototype code: limit output to session window capacity.
Browse files Browse the repository at this point in the history
Debug only - do not merge.
  • Loading branch information
kgiusti committed Jan 18, 2024
1 parent 9c9ee19 commit dda5351
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 39 deletions.
9 changes: 8 additions & 1 deletion include/qpid/dispatch/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,19 @@ void qd_link_set_link_id(qd_link_t *link, uint64_t link_id);
struct qd_message_t;
void qd_link_set_incoming_msg(qd_link_t *link, struct qd_message_t *msg);

qd_session_t *qd_session(pn_session_t *pn_ssn);
qd_session_t *qd_session(qd_connection_t *conn, pn_session_t *pn_ssn);
qd_connection_t *qd_session_connection(const qd_session_t *qd_ssn);
void qd_session_cleanup(qd_connection_t *qd_conn);
void qd_session_free(qd_session_t *qd_ssn);
bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn);
qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn);

size_t qd_session_outgoing_buffer_capacity(const qd_session_t *qd_ssn);
qd_session_t *qd_link_session(const qd_link_t *link);
pn_connection_t *qd_connection_pn_conn(const qd_connection_t *qd_conn);
bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn);
bool qd_link_is_q3_blocked(const qd_link_t *link);

void qd_connection_log_policy_denial(qd_link_t *link, const char *text);


Expand Down
90 changes: 84 additions & 6 deletions src/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct qd_link_t {
DEQ_LINKS(qd_link_t);
pn_session_t *pn_sess;
pn_link_t *pn_link;
qd_session_t *qd_session; // TODO: kag fix me
qd_direction_t direction;
void *context;
qd_node_t *node;
Expand All @@ -80,6 +81,8 @@ ALLOC_DEFINE(qd_link_ref_t);
struct qd_session_t {
DEQ_LINKS(qd_session_t);
pn_session_t *pn_sess;
qd_connection_t *qd_conn; // parent;
uint32_t max_frame_size; // in bytes
qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty
};

Expand Down Expand Up @@ -146,6 +149,8 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
DEQ_INSERT_TAIL(container->links, link);
sys_mutex_unlock(&container->lock);
link->pn_sess = pn_link_session(pn_link);
link->qd_session = qd_session_from_pn(link->pn_sess); // kag fixme
assert(link->qd_session);
link->pn_link = pn_link;
link->direction = QD_OUTGOING;
link->context = 0;
Expand Down Expand Up @@ -183,7 +188,9 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, u
sys_mutex_lock(&container->lock);
DEQ_INSERT_TAIL(container->links, link);
sys_mutex_unlock(&container->lock);
link->pn_sess = pn_link_session(pn_link);
link->pn_sess = pn_link_session(pn_link);
link->qd_session = qd_session_from_pn(link->pn_sess); // kag fixme
assert(link->qd_session);
link->pn_link = pn_link;
link->direction = QD_INCOMING;
link->context = 0;
Expand Down Expand Up @@ -344,6 +351,7 @@ static void cleanup_link(qd_link_t *link)
link->pn_link = 0;
}
link->pn_sess = 0;
link->qd_session = 0; // kag fixme

// cleanup any inbound message that has not been forwarded
qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg);
Expand Down Expand Up @@ -534,7 +542,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
// remote created new session
assert(qd_session_from_pn(ssn) == 0);
qd_session_t *qd_ssn = qd_session(ssn);
qd_session_t *qd_ssn = qd_session(qd_conn, ssn);
if (!qd_ssn) {
pn_condition_t *cond = pn_session_condition(ssn);
pn_condition_set_name(cond, QD_AMQP_COND_INTERNAL_ERROR);
Expand All @@ -551,6 +559,13 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
qd_session_link_pn(qd_ssn, ssn);
qd_policy_apply_session_settings(ssn, qd_conn);

//
pn_sequence_t iwin = pni_session_get_remote_incoming_window(ssn);
qd_log(LOG_CONTAINER, QD_LOG_INFO,
"qd_session: %p opened, incoming window = %lu", (void *)qd_ssn, (unsigned long)iwin);
//

pn_session_open(ssn);
}
}
Expand All @@ -568,8 +583,10 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
while (pn_link) {
if (pn_link_session(pn_link) == ssn) {
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link)
if (qd_link) {
qd_link->pn_link = 0;
qd_link->qd_session = 0; // kag fix me
}
}
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
Expand Down Expand Up @@ -623,8 +640,10 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
}

if (qd_link)
if (qd_link) {
qd_link->pn_link = 0;
qd_link->qd_session = 0;
}
}
pn_link = pn_link_next(pn_link, 0);

Expand Down Expand Up @@ -932,14 +951,15 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node)
qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name, qd_session_class_t ssn_class)
{
const qd_server_config_t * cf = qd_connection_config(conn);
qd_session_t *qd_ssn = 0;

pn_session_t *pn_ssn = conn->pn_sessions[ssn_class];
if (!pn_ssn) {
pn_ssn = pn_session(qd_connection_pn(conn));
if (!pn_ssn) {
return NULL;
}
qd_session_t *qd_ssn = qd_session(pn_ssn);
qd_ssn = qd_session(conn, pn_ssn);
if (!qd_ssn) {
pn_session_free(pn_ssn);
return NULL; // PN_LOCAL_CLOSE event will free pn_ssn
Expand All @@ -948,7 +968,10 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
conn->pn_sessions[ssn_class] = pn_ssn;
pn_session_set_incoming_capacity(pn_ssn, cf->incoming_capacity);
pn_session_open(pn_ssn);
} else {
qd_ssn = pn_session_get_context(pn_ssn);
}
assert(qd_ssn);

qd_link_t *link = new_qd_link_t();
if (!link) {
Expand All @@ -961,6 +984,7 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
sys_mutex_unlock(&node->container->lock);

link->pn_sess = pn_ssn;
link->qd_session = qd_ssn;

if (dir == QD_OUTGOING)
link->pn_link = pn_sender(link->pn_sess, name);
Expand Down Expand Up @@ -1111,6 +1135,11 @@ void *qd_link_get_node_context(const qd_link_t *link)
return (link && link->node) ? link->node->context : 0;
}

bool qd_link_is_q3_blocked(const qd_link_t *link)
{
assert(link);
return link->q3_blocked;
}

/** sending link has entered Q3 flow control */
void qd_link_q3_block(qd_link_t *link)
Expand Down Expand Up @@ -1149,18 +1178,27 @@ void qd_link_set_link_id(qd_link_t *link, uint64_t link_id)
}


qd_session_t *qd_session(pn_session_t *pn_ssn)
qd_session_t *qd_session(qd_connection_t *conn, pn_session_t *pn_ssn)
{
assert(pn_ssn);
assert(conn);
pn_transport_t *tport = pn_connection_transport(conn->pn_conn);
assert(tport);

qd_session_t *qd_ssn = qd_session_from_pn(pn_ssn);
if (!qd_ssn) {
qd_ssn = new_qd_session_t();
if (qd_ssn) {
ZERO(qd_ssn);
qd_ssn->pn_sess = pn_ssn;
qd_ssn->qd_conn = conn;
qd_ssn->max_frame_size = pn_transport_get_max_frame(tport);
assert(qd_ssn->max_frame_size);
DEQ_INIT(qd_ssn->q3_blocked_links);
qd_session_link_pn(qd_ssn, pn_ssn);
}
} else {
assert(qd_ssn->qd_conn == conn);
}
return qd_ssn;
}
Expand Down Expand Up @@ -1212,6 +1250,46 @@ void qd_session_cleanup(qd_connection_t *qd_conn)
}
}

// Get the maximum number of buffers that can be written to the session.
// If zero is returned the session outgoing window is blocked.
//
// Why return the number of buffers rather than the number of bytes?? Because some output paths (unicast cut through)
// cannot write parital buffers to the session. For these output paths having a session window > 0 but < QD_BUFFER_SIZE
// the session window is effectively closed. Keep life simple and consider a session window < QD_BUFFER_SIZE as closed.
//
size_t qd_session_outgoing_buffer_capacity(const qd_session_t *qd_ssn)
{
assert(qd_ssn);
#ifndef NDEBUG
// ensure this thread can access the session
CHECK_PROACTOR_CONNECTION(pn_session_connection(qd_ssn->pn_sess));
#endif
size_t used = pn_session_outgoing_bytes(qd_ssn->pn_sess);
size_t available = pni_session_get_remote_incoming_window(qd_ssn->pn_sess) * qd_ssn->max_frame_size;

if (available > used) {
return (available - used) / QD_BUFFER_SIZE;
}
return 0;
}

qd_connection_t *qd_session_connection(const qd_session_t *qd_ssn)
{
assert(qd_ssn);
return qd_ssn->qd_conn;
}

pn_connection_t *qd_connection_pn_conn(const qd_connection_t *qd_conn)
{
assert(qd_conn);
return qd_conn->pn_conn;
}

qd_session_t *qd_link_session(const qd_link_t *qd_link)
{
assert(qd_link);
return qd_link->qd_session;
}

void qd_link_set_incoming_msg(qd_link_t *link, qd_message_t *msg)
{
Expand Down
Loading

0 comments on commit dda5351

Please sign in to comment.