Skip to content

Commit

Permalink
proxy: scaling fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
TrekkieCoder committed Oct 10, 2024
1 parent 75e909b commit 67b98ed
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 75 deletions.
151 changes: 128 additions & 23 deletions common/notify.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ typedef struct notify_thr {
int thrid;
} notify_thr_t;

typedef struct notify_pollfd {
int evict;
} notify_pollfd_t;

typedef struct notify_poll_ctx {
int n_pfds;
notify_pollfd_t npfds[MAX_NOTIFY_POLL_FDS];
struct pollfd pfds[MAX_NOTIFY_POLL_FDS];
} notify_poll_ctx_t;

Expand Down Expand Up @@ -102,8 +107,9 @@ notify_conv4mpoll_events(short events)
type |= NOTI_TYPE_HUP;
}
if (events & (POLLERR|POLLNVAL)) {
type |= NOTI_TYPE_ERROR;
type |= NOTI_TYPE_ERROR;
}

return type;
}

Expand Down Expand Up @@ -206,13 +212,15 @@ notify_add_ent(void *ctx, int fd, notify_type_t type, void *priv)

pctx->pfds[pctx->n_pfds].fd = fd;
pctx->pfds[pctx->n_pfds].events = events;
pctx->npfds[pctx->n_pfds].evict = 0;

nctx->n_fds++;
pctx->n_pfds++;

//log_trace("notify - add fd %d tslot %d %d:%d", fd, tslot, nctx->n_fds, pctx->n_pfds);

NOTI_UNLOCK(nctx);

//log_debug("notify - add fd %d tslot %d", fd, tslot);
return 0;
}

Expand All @@ -224,25 +232,46 @@ notify_delete_ent__(void *ctx, int fd)
notify_ent_t *ent;
notify_ent_t *pent;
notify_poll_ctx_t *pctx;
int poll_slot;
int tslot;
void *priv;

assert(ctx);

if (fd <= 0 || fd >= MAX_NOTIFY_FDS) {
return -EINVAL;
}

NOTI_LOCK(nctx);
ent = &nctx->earr[fd];
if (ent->fd <= 0) {
NOTI_UNLOCK(nctx);
return -ENOENT;
}

if (ent->poll_slot < 0) {
if (ent->poll_slot < 0 || ent->poll_slot >= MAX_NOTIFY_POLL_FDS) {
NOTI_UNLOCK(nctx);
assert(0);
}

pctx = &nctx->poll_ctx[ent->thr_id];
priv = ent->priv;
poll_slot = ent->poll_slot;
tslot = ent->thr_id;

for (i = ent->poll_slot; i < pctx->n_pfds - 1; i++) {
if (tslot < 0 || tslot >= MAX_NOTIFY_THREADS) {
NOTI_UNLOCK(nctx);
assert(0);
}

ent->fd = -1;
ent->type = 0;
ent->poll_slot = -1;
ent->priv = NULL;
ent->thr_id = 0;

pctx = &nctx->poll_ctx[tslot];

for (i = poll_slot; i < pctx->n_pfds - 1; i++) {

pent = NULL;
if (pctx->pfds[i+1].fd > 0 && pctx->pfds[i+1].fd < MAX_NOTIFY_FDS) {
Expand All @@ -251,39 +280,93 @@ notify_delete_ent__(void *ctx, int fd)

pctx->pfds[i].fd = pctx->pfds[i+1].fd;
pctx->pfds[i].events = pctx->pfds[i+1].events;
pctx->npfds[i].evict = 0;

if (pent) {
pent->poll_slot = i;
}
}

//log_debug("notify del fd %d tslot %d", fd, ent->thr_id);
nctx->n_fds--;
pctx->n_pfds--;

ent->fd = -1;
ent->type = 0;
ent->poll_slot = -1;
if (ent->priv) {
//log_trace("notify del fd %d tslot %d %d:%d", fd, tslot, nctx->n_fds, pctx->n_pfds);

NOTI_UNLOCK(nctx);

if (priv) {
if (nctx->cbs.pdestroy) {
nctx->cbs.pdestroy(ent->priv);
nctx->cbs.pdestroy(priv);
}
ent->priv = NULL;
}

nctx->n_fds--;
pctx->n_pfds--;
return 0;
}

static int
notify_delete_ent_evict__(void *ctx, int fd)
{
notify_ctx_t *nctx = ctx;
notify_ent_t *ent;
notify_ent_t *pent;
notify_poll_ctx_t *pctx;
int poll_slot;
int tslot;

assert(ctx);

if (fd <= 0 || fd >= MAX_NOTIFY_FDS) {
return -EINVAL;
}

NOTI_LOCK(nctx);
ent = &nctx->earr[fd];
if (ent->fd <= 0) {
NOTI_UNLOCK(nctx);
assert(0);
return -ENOENT;
}

if (ent->poll_slot < 0) {
NOTI_UNLOCK(nctx);
assert(0);
}

tslot = ent->thr_id;
if (tslot < 0 || tslot >= MAX_NOTIFY_THREADS) {
NOTI_UNLOCK(nctx);
assert(0);
}

poll_slot = ent->poll_slot;
if (poll_slot < 0 || poll_slot >= MAX_NOTIFY_POLL_FDS) {
NOTI_UNLOCK(nctx);
assert(0);
}

pctx = &nctx->poll_ctx[tslot];
if (pctx->pfds[poll_slot].fd <= 0) {
NOTI_UNLOCK(nctx);
assert(0);
}

pctx->npfds[poll_slot].evict = 1;
NOTI_UNLOCK(nctx);

return 0;
}

int
notify_delete_ent(void *ctx, int fd)
notify_delete_ent(void *ctx, int fd, int evict)
{
int rc;
notify_ctx_t *nctx = ctx;

NOTI_LOCK(nctx);
rc = notify_delete_ent__(ctx, fd);
NOTI_UNLOCK(nctx);
if (evict) {
rc = notify_delete_ent_evict__(ctx, fd);
} else {
rc = notify_delete_ent__(ctx, fd);
}

return rc;
}
Expand All @@ -294,10 +377,11 @@ notify_run(void *ctx, int thread)
int rc = 0;
int nproc = 0;
int i = 0;
size_t parr_sz;
int n_pfds = 0;
void *priv = NULL;
char estr[128];;
struct pollfd pfds[MAX_NOTIFY_POLL_FDS];
struct pollfd *pfds;
notify_ent_t *ent;
notify_ctx_t *nctx = ctx;

Expand All @@ -307,24 +391,42 @@ notify_run(void *ctx, int thread)
assert(0);
}

parr_sz = MAX_NOTIFY_POLL_FDS*sizeof(struct pollfd);
pfds = calloc(1, MAX_NOTIFY_POLL_FDS*sizeof(struct pollfd));
assert(pfds);

while(1) {

/* This is seemingly expensive operation */
NOTI_LOCK(nctx);
memcpy(pfds, nctx->poll_ctx[thread].pfds, sizeof(pfds));
memcpy(pfds, nctx->poll_ctx[thread].pfds, parr_sz);
n_pfds = nctx->poll_ctx[thread].n_pfds;
NOTI_UNLOCK(nctx);

nproc = 0;
rc = poll(pfds, n_pfds, MAX_NOTIFY_POLL_TIMEO);
if (rc < 0) {
log_error("poll :error(%s)", strerror_r(errno, estr, sizeof(estr)));
log_error("notify:poll:error(%s)", strerror_r(errno, estr, sizeof(estr)));
usleep(200*1000);
continue;
}

if (rc == 0) {
continue;
int evict = 0;
for (i = 0; i < n_pfds; i++) {
NOTI_LOCK(nctx);
if (nctx->poll_ctx[thread].npfds[i].evict &&
nctx->poll_ctx[thread].pfds[i].fd > 0) {
evict = 1;
pfds[i].revents = POLLERR;
}
NOTI_UNLOCK(nctx);
}

if (!evict) {
//log_trace("notify:poll:timeout (n_pfds %d)", n_pfds);
continue;
}
}

for (i = 0 ; i < n_pfds; i++) {
Expand All @@ -335,13 +437,16 @@ notify_run(void *ctx, int thread)
}

if (fd <= 0 || fd >= MAX_NOTIFY_FDS) {
log_trace("notify:poll:fd invaild (n_pfds %d)", n_pfds);
continue;
}

NOTI_LOCK(nctx);
ent = &nctx->earr[fd];
if (ent->fd <= 0) {
NOTI_UNLOCK(nctx);
log_error("notify:poll:ent fd %d invalid (n_pfds %d)", fd, n_pfds);
notify_delete_ent__(nctx, fd);
continue;
}
priv = ent->priv;
Expand All @@ -355,7 +460,7 @@ notify_run(void *ctx, int thread)
}

if (type & (NOTI_TYPE_HUP|NOTI_TYPE_ERROR)) {
//log_debug("notify hup %d", fd);
//log_trace("notify:hup %d", fd);
notify_delete_ent__(nctx, fd);
}
nproc++;
Expand Down
2 changes: 1 addition & 1 deletion common/notify.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ typedef struct notify_cbs {
} notify_cbs_t ;

int notify_check_slot(void *ctx, int fd);
int notify_delete_ent(void *ctx, int fd);
int notify_delete_ent(void *ctx, int fd, int evict);
int notify_add_ent(void *ctx, int fd, notify_type_t type, void *priv);
int notify_start(void *ctx);
void *notify_ctx_new(notify_cbs_t *cbs, int n_thrs);
Expand Down
Loading

0 comments on commit 67b98ed

Please sign in to comment.