Skip to content

Commit

Permalink
zebra: backpressure - Zebra push back on Buffer/Stream creation
Browse files Browse the repository at this point in the history
Currently, the way zebra works is it creates pthread per client (BGP is
of interest in this case) and this thread loops itself in zserv_read()
to check for any incoming data. If there is one, then it reads,
validates and adds it in the ibuf_fifo signalling the main thread to
process the message. The main thread when it gets a change, processes
the message, and invokes the function pointer registered in the header
command. (Ex: zserv_handlers).

Finally, if all of this was successful, this task reschedules itself and
loops in zserv_read() again

However, if there are already items on ibuf FIFO, that means zebra is
slow in processing. And with the current mechanism if Zebra main is
busy, the ibuf FIFO keeps growing holding up the memory.

Show memory zebra:(Example: 15k streams hoarding ~160 MB of data)
--- qmem libfrr ---
Stream             :       44 variable   3432352    15042 161243800

Fix:
Client IO Thread: (zserv_read)
 - Stop doing the read events when we know there are X number of items
   on the FIFO already.(X - zebra zapi-packets <1-10000> (Default-1000)

 - Determine the number of items on the zserv->ibuf_fifo. Subtract this
   from the work items and only pull the number of items off that would
   take us to X items on the ibuf_fifo again.

 - If the number of items in the ibuf_fifo has reached to the maximum
      * Either initially when zserv_read() is called (or)
      * when processing the remainders of the incoming buffer
   the client IO thread is woken by the the zebra main.

Main thread: (zserv_process_message)
 - In one iteration of wake up in zserv_process_message(), a maximum of
   configured p2p/4 items from ibuf_fifo is only processed.
 - If ibuf_fifo still has items in it, main thread reschedules itself
   ensuring that the main thread is always processing some packets

 - If the client ibuf has maximum items, then the main thread after
   processing ptp/4 items in the ibuf_fifo signals the client IO thread.
   This way we ensure that the client IO thread is always reading from
   socket buffer and adding items in the ibuf_fifo.

VTY:
 - Signal all client IO threads in the zrouter list when ever the hidden
   command "zebra zapi-packets <>" is configured/unconfigured.

Ticket: #3390099

Signed-off-by: Donald Sharp <[email protected]>
Signed-off-by: Rajasekar Raja <[email protected]>
  • Loading branch information
raja-rajasekar committed Mar 6, 2024
1 parent ffab0d7 commit a24eaa4
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 48 deletions.
15 changes: 15 additions & 0 deletions zebra/zebra_vty.c
Original file line number Diff line number Diff line change
Expand Up @@ -3654,10 +3654,17 @@ DEFUN_HIDDEN (zebra_packet_process,
"Number of packets to process before relinquishing thread\n")
{
uint32_t packets = strtoul(argv[2]->arg, NULL, 10);
struct listnode *node = NULL;
struct zserv *client = NULL;

atomic_store_explicit(&zrouter.packets_to_process, packets,
memory_order_relaxed);

zlog_info("Setting the Zapi packet to process to %d", packets);

for (ALL_LIST_ELEMENTS_RO(zrouter.client_list, node, client))
zserv_client_event(client, ZSERV_CLIENT_READ);

return CMD_SUCCESS;
}

Expand All @@ -3669,9 +3676,17 @@ DEFUN_HIDDEN (no_zebra_packet_process,
"Zapi Protocol\n"
"Number of packets to process before relinquishing thread\n")
{
struct listnode *node = NULL;
struct zserv *client = NULL;

atomic_store_explicit(&zrouter.packets_to_process,
ZEBRA_ZAPI_PACKETS_TO_PROCESS,
memory_order_relaxed);
zlog_debug("Setting the Zapi packet to process to default value of %d",
ZEBRA_ZAPI_PACKETS_TO_PROCESS);

for (ALL_LIST_ELEMENTS_RO(zrouter.client_list, node, client))
zserv_client_event(client, ZSERV_CLIENT_READ);

return CMD_SUCCESS;
}
Expand Down
129 changes: 81 additions & 48 deletions zebra/zserv.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,6 @@ static struct zserv *find_client_internal(uint8_t proto,
/* Mem type for zclients. */
DEFINE_MTYPE_STATIC(ZEBRA, ZSERV_CLIENT, "ZClients");

/*
* Client thread events.
*
* These are used almost exclusively by client threads to drive their own event
* loops. The only exception is in zserv_client_create(), which pushes an
* initial ZSERV_CLIENT_READ event to start the API handler loop.
*/
enum zserv_client_event {
/* Schedule a socket read */
ZSERV_CLIENT_READ,
/* Schedule a buffer write */
ZSERV_CLIENT_WRITE,
};

/*
* Main thread events.
Expand All @@ -97,23 +84,6 @@ enum zserv_event {
ZSERV_HANDLE_CLIENT_FAIL,
};

/*
* Zebra server event driver for all client threads.
*
* This is essentially a wrapper around event_add_event() that centralizes
* those scheduling calls into one place.
*
* All calls to this function schedule an event on the pthread running the
* provided client.
*
* client
* the client in question, and thread target
*
* event
* the event to notify them about
*/
static void zserv_client_event(struct zserv *client,
enum zserv_client_event event);

/*
* Zebra server event driver for the main thread.
Expand Down Expand Up @@ -306,22 +276,45 @@ static void zserv_write(struct event *thread)
* this task reschedules itself.
*
* Any failure in any of these actions is handled by terminating the client.
*
* The client's input buffer ibuf_fifo can have a maximum items as configured
* in the packets_to_process. Once the maximum item limit is reached,
* the client pthread expects main to signal for it to continue reading the
* incoming buffer. This way we are not filling up the FIFO more than the
* maximum when the zebra main is busy.
*/
static void zserv_read(struct event *thread)
void zserv_read(struct event *thread)
{
struct zserv *client = EVENT_ARG(thread);
int sock;
size_t already;
struct stream_fifo *cache;
uint32_t p2p_orig;

uint32_t p2p;
uint32_t p2p; /* Temp p2p used to process */
uint32_t p2p_orig; /* Configured p2p (Default-1000) */
int p2p_avail; /* How much space is available for p2p */
struct zmsghdr hdr;
size_t client_ibuf_fifo_cnt = stream_fifo_count_safe(client->ibuf_fifo);

p2p_orig = atomic_load_explicit(&zrouter.packets_to_process,
memory_order_relaxed);
p2p_avail = p2p_orig - client_ibuf_fifo_cnt;

/*
* No more packets to process. We can reach this condition in two cases.
* Case-1:
* - Say p2p configured is 1000 and we have 700 items on the ibuf_fifo
* - Configure p2p as 500. Since we are above limit, we wait for the main
* thread to pulse the client IO thread.
* Case-2:
* - p2p is constant i.e. say 1000.
* - However number of items on the ibuf_fifo has reached 1000 and main
* thread is yet to process them.
*/
if (p2p_avail <= 0)
return;

p2p = p2p_avail;
cache = stream_fifo_new();
p2p = p2p_orig;
sock = EVENT_FD(thread);

while (p2p) {
Expand Down Expand Up @@ -421,7 +414,7 @@ static void zserv_read(struct event *thread)
p2p--;
}

if (p2p < p2p_orig) {
if (p2p < (uint32_t)p2p_avail) {
uint64_t time_now = monotime(NULL);

/* update session statistics */
Expand All @@ -435,19 +428,42 @@ static void zserv_read(struct event *thread)
while (cache->head)
stream_fifo_push(client->ibuf_fifo,
stream_fifo_pop(cache));
/*Need to update count since main thread could have processed few*/
client_ibuf_fifo_cnt =
stream_fifo_count_safe(client->ibuf_fifo);
}

/* Schedule job to process those packets */
zserv_event(client, ZSERV_PROCESS_MESSAGES);

}

/*
* We arrive at this point in three cases.
* Case-1: Initially Client buffer is already to its max limit i.e.
* client_ibuf_fifo == p2p_orig.
* Case-2: Initially Client buffer is Not at the max limit but becomes full
* i.e. Initial client buffer had 90 elements with max (say 100).
* So we add 10 more items to fifo and reach the max limit.
* Case-3: Initially Client buffer is Not at the max limit and neither is
* it after the handling of new incoming buffer msg
* i.e. Initial client buffer had 40 elements with max (say 100).
* So now we add 20 more items to fifo.
*
* Only in case-3 we reschedule ourselves.
* In case-1 and case-2, the zserv_process_messages() (main thread)
* after processing the msgs from client input buffer (ibuf_fifo), wakes up
* the current client pthread to continue reading incoming messages.
*/
if (IS_ZEBRA_DEBUG_PACKET)
zlog_debug("Read %d packets from client: %s", p2p_orig - p2p,
zebra_route_string(client->proto));

/* Reschedule ourselves */
zserv_client_event(client, ZSERV_CLIENT_READ);
zlog_debug("Read %d packets from client: %s. Current ibuf fifo count: %zu. Conf P2p %d",
p2p_avail - p2p, zebra_route_string(client->proto),
client_ibuf_fifo_cnt, p2p_orig);

/* Case-1, Case-2, Do nothing */
if (client_ibuf_fifo_cnt != p2p_orig) {
/* Case-3: Reschedule ourselves since we have space in ibuf_fifo*/
zserv_client_event(client, ZSERV_CLIENT_READ);
}

stream_fifo_free(cache);

Expand All @@ -458,8 +474,7 @@ static void zserv_read(struct event *thread)
zserv_client_fail(client);
}

static void zserv_client_event(struct zserv *client,
enum zserv_client_event event)
void zserv_client_event(struct zserv *client, enum zserv_client_event event)
{
switch (event) {
case ZSERV_CLIENT_READ:
Expand All @@ -473,6 +488,7 @@ static void zserv_client_event(struct zserv *client,
}
}

#define PTP_ZSERV_PROCESS_LIMIT 4
/* Main thread lifecycle ---------------------------------------------------- */

/*
Expand All @@ -483,27 +499,41 @@ static void zserv_client_event(struct zserv *client,
* as the task argument.
*
* Each message is popped off the client's input queue and the action associated
* with the message is executed. This proceeds until there are no more messages,
* an error occurs, or the processing limit is reached.
* with the message is executed. This proceeds until an error occurs, or the
* processing limit is reached.
*
* The client's I/O thread can push at most zrouter.packets_to_process messages
* onto the input buffer before notifying us there are packets to read. As long
* as we always process zrouter.packets_to_process messages here, then we can
* rely on the read thread to handle queuing this task enough times to process
* everything on the input queue.
*
* In one iteration of wake up, the main thread can process at a maximum of
* configured ptp/4 items from the ibuf_fifo. If ibuf_fifo still has items in
* it, the main thread reschedules itself. This way we ensure that the main
* thread is always processing some packets.
*
* If the client ibuf has maximum items(packets_to_process) then main thread
* after processing ptp/4 items in the ibuf_fifo, it schedules a call to the
* client pthread to pump in more items on the client's input buffer fifo.
* This way we ensure that the client IO thread is always reading from socket
* buffer and adding items in the ibuf_fifo.
*/
static void zserv_process_messages(struct event *thread)
{
struct zserv *client = EVENT_ARG(thread);
struct stream *msg;
struct stream_fifo *cache = stream_fifo_new();
uint32_t p2p = zrouter.packets_to_process;
uint32_t p2p_limit = p2p / PTP_ZSERV_PROCESS_LIMIT;
bool need_resched = false;
size_t client_ibuf_fifo_cnt;

frr_with_mutex (&client->ibuf_mtx) {
client_ibuf_fifo_cnt = stream_fifo_count_safe(client->ibuf_fifo);
uint32_t i;
for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo);
++i) {
for (i = 0;
i < p2p_limit && stream_fifo_head(client->ibuf_fifo); ++i) {
msg = stream_fifo_pop(client->ibuf_fifo);
stream_fifo_push(cache, msg);
}
Expand All @@ -524,6 +554,9 @@ static void zserv_process_messages(struct event *thread)
/* Reschedule ourselves if necessary */
if (need_resched)
zserv_event(client, ZSERV_PROCESS_MESSAGES);

if (client_ibuf_fifo_cnt == p2p)
zserv_client_event(client, ZSERV_CLIENT_READ);
}

int zserv_send_message(struct zserv *client, struct stream *msg)
Expand Down
33 changes: 33 additions & 0 deletions zebra/zserv.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,20 @@ void zserv_log_message(const char *errmsg, struct stream *msg,
/* TODO */
__attribute__((__noreturn__)) void zebra_finalize(struct event *event);

/*
* Client thread events.
*
* These are used almost exclusively by client threads to drive their own event
* loops. The only exception is in zserv_client_create(), which pushes an
* initial ZSERV_CLIENT_READ event to start the API handler loop.
*/
enum zserv_client_event {
/* Schedule a socket read */
ZSERV_CLIENT_READ,
/* Schedule a buffer write */
ZSERV_CLIENT_WRITE,
};

/*
* Graceful restart functions.
*/
Expand All @@ -384,6 +398,25 @@ extern void zebra_gr_stale_client_cleanup(struct list *client_list);
extern void zread_client_capabilities(struct zserv *client, struct zmsghdr *hdr,
struct stream *msg,
struct zebra_vrf *zvrf);
extern void zserv_read(struct event *thread);

/*
* Zebra server event driver for all client threads.
*
* This is essentially a wrapper around event_add_event() that centralizes
* those scheduling calls into one place.
*
* All calls to this function schedule an event on the pthread running the
* provided client.
*
* client
* the client in question, and thread target
*
* event
* the event to notify them about
*/
extern void zserv_client_event(struct zserv *client,
enum zserv_client_event event);

#ifdef __cplusplus
}
Expand Down

0 comments on commit a24eaa4

Please sign in to comment.