diff --git a/common/notify.c b/common/notify.c index bbd79e8..6ffd00c 100644 --- a/common/notify.c +++ b/common/notify.c @@ -53,8 +53,13 @@ typedef struct notify_thr { int thrid; } notify_thr_t; +typedef struct notify_pollfd { + int evict; +} notify_pollfd_t; + typedef struct notify_poll_ctx { int n_pfds; + notify_pollfd_t npfds[MAX_NOTIFY_POLL_FDS]; struct pollfd pfds[MAX_NOTIFY_POLL_FDS]; } notify_poll_ctx_t; @@ -102,8 +107,9 @@ notify_conv4mpoll_events(short events) type |= NOTI_TYPE_HUP; } if (events & (POLLERR|POLLNVAL)) { - type |= NOTI_TYPE_ERROR; + type |= NOTI_TYPE_ERROR; } + return type; } @@ -206,13 +212,15 @@ notify_add_ent(void *ctx, int fd, notify_type_t type, void *priv) pctx->pfds[pctx->n_pfds].fd = fd; pctx->pfds[pctx->n_pfds].events = events; + pctx->npfds[pctx->n_pfds].evict = 0; nctx->n_fds++; pctx->n_pfds++; + //log_trace("notify - add fd %d tslot %d %d:%d", fd, tslot, nctx->n_fds, pctx->n_pfds); + NOTI_UNLOCK(nctx); - //log_debug("notify - add fd %d tslot %d", fd, tslot); return 0; } @@ -224,6 +232,9 @@ notify_delete_ent__(void *ctx, int fd) notify_ent_t *ent; notify_ent_t *pent; notify_poll_ctx_t *pctx; + int poll_slot; + int tslot; + void *priv; assert(ctx); @@ -231,18 +242,36 @@ notify_delete_ent__(void *ctx, int fd) return -EINVAL; } + NOTI_LOCK(nctx); ent = &nctx->earr[fd]; if (ent->fd <= 0) { + NOTI_UNLOCK(nctx); return -ENOENT; } - if (ent->poll_slot < 0) { + if (ent->poll_slot < 0 || ent->poll_slot >= MAX_NOTIFY_POLL_FDS) { + NOTI_UNLOCK(nctx); assert(0); } - pctx = &nctx->poll_ctx[ent->thr_id]; + priv = ent->priv; + poll_slot = ent->poll_slot; + tslot = ent->thr_id; - for (i = ent->poll_slot; i < pctx->n_pfds - 1; i++) { + if (tslot < 0 || tslot >= MAX_NOTIFY_THREADS) { + NOTI_UNLOCK(nctx); + assert(0); + } + + ent->fd = -1; + ent->type = 0; + ent->poll_slot = -1; + ent->priv = NULL; + ent->thr_id = 0; + + pctx = &nctx->poll_ctx[tslot]; + + for (i = poll_slot; i < pctx->n_pfds - 1; i++) { pent = NULL; if (pctx->pfds[i+1].fd > 0 && pctx->pfds[i+1].fd < MAX_NOTIFY_FDS) { @@ -251,39 +280,93 @@ notify_delete_ent__(void *ctx, int fd) pctx->pfds[i].fd = pctx->pfds[i+1].fd; pctx->pfds[i].events = pctx->pfds[i+1].events; + pctx->npfds[i].evict = 0; if (pent) { pent->poll_slot = i; } } - //log_debug("notify del fd %d tslot %d", fd, ent->thr_id); + nctx->n_fds--; + pctx->n_pfds--; - ent->fd = -1; - ent->type = 0; - ent->poll_slot = -1; - if (ent->priv) { + //log_trace("notify del fd %d tslot %d %d:%d", fd, tslot, nctx->n_fds, pctx->n_pfds); + + NOTI_UNLOCK(nctx); + + if (priv) { if (nctx->cbs.pdestroy) { - nctx->cbs.pdestroy(ent->priv); + nctx->cbs.pdestroy(priv); } - ent->priv = NULL; } - nctx->n_fds--; - pctx->n_pfds--; + return 0; +} + +static int +notify_delete_ent_evict__(void *ctx, int fd) +{ + notify_ctx_t *nctx = ctx; + notify_ent_t *ent; + notify_ent_t *pent; + notify_poll_ctx_t *pctx; + int poll_slot; + int tslot; + + assert(ctx); + + if (fd <= 0 || fd >= MAX_NOTIFY_FDS) { + return -EINVAL; + } + + NOTI_LOCK(nctx); + ent = &nctx->earr[fd]; + if (ent->fd <= 0) { + NOTI_UNLOCK(nctx); + assert(0); + return -ENOENT; + } + + if (ent->poll_slot < 0) { + NOTI_UNLOCK(nctx); + assert(0); + } + + tslot = ent->thr_id; + if (tslot < 0 || tslot >= MAX_NOTIFY_THREADS) { + NOTI_UNLOCK(nctx); + assert(0); + } + + poll_slot = ent->poll_slot; + if (poll_slot < 0 || poll_slot >= MAX_NOTIFY_POLL_FDS) { + NOTI_UNLOCK(nctx); + assert(0); + } + + pctx = &nctx->poll_ctx[tslot]; + if (pctx->pfds[poll_slot].fd <= 0) { + NOTI_UNLOCK(nctx); + assert(0); + } + + pctx->npfds[poll_slot].evict = 1; + NOTI_UNLOCK(nctx); return 0; } int -notify_delete_ent(void *ctx, int fd) +notify_delete_ent(void *ctx, int fd, int evict) { int rc; notify_ctx_t *nctx = ctx; - NOTI_LOCK(nctx); - rc = notify_delete_ent__(ctx, fd); - NOTI_UNLOCK(nctx); + if (evict) { + rc = notify_delete_ent_evict__(ctx, fd); + } else { + rc = notify_delete_ent__(ctx, fd); + } return rc; } @@ -294,10 +377,11 @@ notify_run(void *ctx, int thread) int rc = 0; int nproc = 0; int i = 0; + size_t parr_sz; int n_pfds = 0; void *priv = NULL; char estr[128];; - struct pollfd pfds[MAX_NOTIFY_POLL_FDS]; + struct pollfd *pfds; notify_ent_t *ent; notify_ctx_t *nctx = ctx; @@ -307,24 +391,42 @@ notify_run(void *ctx, int thread) assert(0); } + parr_sz = MAX_NOTIFY_POLL_FDS*sizeof(struct pollfd); + pfds = calloc(1, MAX_NOTIFY_POLL_FDS*sizeof(struct pollfd)); + assert(pfds); + while(1) { /* This is seemingly expensive operation */ NOTI_LOCK(nctx); - memcpy(pfds, nctx->poll_ctx[thread].pfds, sizeof(pfds)); + memcpy(pfds, nctx->poll_ctx[thread].pfds, parr_sz); n_pfds = nctx->poll_ctx[thread].n_pfds; NOTI_UNLOCK(nctx); nproc = 0; rc = poll(pfds, n_pfds, MAX_NOTIFY_POLL_TIMEO); if (rc < 0) { - log_error("poll :error(%s)", strerror_r(errno, estr, sizeof(estr))); + log_error("notify:poll:error(%s)", strerror_r(errno, estr, sizeof(estr))); usleep(200*1000); continue; } if (rc == 0) { - continue; + int evict = 0; + for (i = 0; i < n_pfds; i++) { + NOTI_LOCK(nctx); + if (nctx->poll_ctx[thread].npfds[i].evict && + nctx->poll_ctx[thread].pfds[i].fd > 0) { + evict = 1; + pfds[i].revents = POLLERR; + } + NOTI_UNLOCK(nctx); + } + + if (!evict) { + //log_trace("notify:poll:timeout (n_pfds %d)", n_pfds); + continue; + } } for (i = 0 ; i < n_pfds; i++) { @@ -335,6 +437,7 @@ notify_run(void *ctx, int thread) } if (fd <= 0 || fd >= MAX_NOTIFY_FDS) { + log_trace("notify:poll:fd invaild (n_pfds %d)", n_pfds); continue; } @@ -342,6 +445,8 @@ notify_run(void *ctx, int thread) ent = &nctx->earr[fd]; if (ent->fd <= 0) { NOTI_UNLOCK(nctx); + log_error("notify:poll:ent fd %d invalid (n_pfds %d)", fd, n_pfds); + notify_delete_ent__(nctx, fd); continue; } priv = ent->priv; @@ -355,7 +460,7 @@ notify_run(void *ctx, int thread) } if (type & (NOTI_TYPE_HUP|NOTI_TYPE_ERROR)) { - //log_debug("notify hup %d", fd); + //log_trace("notify:hup %d", fd); notify_delete_ent__(nctx, fd); } nproc++; diff --git a/common/notify.h b/common/notify.h index eeb5ec1..db9e6b3 100644 --- a/common/notify.h +++ b/common/notify.h @@ -20,7 +20,7 @@ typedef struct notify_cbs { } notify_cbs_t ; int notify_check_slot(void *ctx, int fd); -int notify_delete_ent(void *ctx, int fd); +int notify_delete_ent(void *ctx, int fd, int evict); int notify_add_ent(void *ctx, int fd, notify_type_t type, void *priv); int notify_start(void *ctx); void *notify_ctx_new(notify_cbs_t *cbs, int n_thrs); diff --git a/common/sockproxy.c b/common/sockproxy.c index 68471d4..8bf5ca8 100644 --- a/common/sockproxy.c +++ b/common/sockproxy.c @@ -123,7 +123,7 @@ typedef struct llb_sockmap_key smap_key_t; static proxy_struct_t *proxy_struct; -#define HAVE_PROXY_MAPFD +//#define HAVE_PROXY_MAPFD #ifdef HAVE_PROXY_MAPFD static int fd_in_use(int fd) @@ -371,15 +371,15 @@ proxy_xmit_cache(proxy_fd_ent_t *ent) return 0; case SSL_ERROR_WANT_WRITE: PROXY_ENT_CUNLOCK(ent); - log_trace("ssl-want-wr %s", - ERR_error_string(ERR_get_error(), NULL)); + //log_trace("ssl-want-wr %s", + // ERR_error_string(ERR_get_error(), NULL)); notify_add_ent(proxy_struct->ns, ent->fd, NOTI_TYPE_IN|NOTI_TYPE_HUP|NOTI_TYPE_OUT, ent); return -1; case SSL_ERROR_WANT_READ: PROXY_ENT_CUNLOCK(ent); - log_trace("ssl-want-rd %s", - ERR_error_string(ERR_get_error(), NULL)); + //log_trace("ssl-want-rd %s", + // ERR_error_string(ERR_get_error(), NULL)); return -1; case SSL_ERROR_SYSCALL: case SSL_ERROR_SSL: @@ -392,8 +392,8 @@ proxy_xmit_cache(proxy_fd_ent_t *ent) log_trace("ssl-wr-zero-ret %s", ERR_error_string(ERR_get_error(), NULL)); default: - log_trace("ssl-err-ret %s", - ERR_error_string(ERR_get_error(), NULL)); + //log_trace("ssl-err-ret %s", + // ERR_error_string(ERR_get_error(), NULL)); SSL_shutdown(ent->ssl); PROXY_ENT_CUNLOCK(ent); return -1; @@ -409,12 +409,14 @@ proxy_xmit_cache(proxy_fd_ent_t *ent) if (tmp) free(tmp); } + + ent->cache_head = NULL; + PROXY_ENT_CUNLOCK(ent); + if (rstev) { notify_add_ent(proxy_struct->ns, ent->fd, NOTI_TYPE_IN|NOTI_TYPE_HUP, ent); } - ent->cache_head = NULL; - PROXY_ENT_CUNLOCK(ent); return 0; } @@ -681,15 +683,12 @@ proxy_ssl_connect(int fd, void *ssl) int err; int ssl_err; int sret; - struct timeval tv = { .tv_sec = 0, - .tv_usec = 500000 - }; + struct pollfd pfds = { 0 }; assert(ssl); SSL_set_fd(ssl, fd); - FD_ZERO(&fds); - FD_SET(fd, &fds); + pfds.fd = fd; while (to--) { err = SSL_connect(ssl); @@ -699,12 +698,14 @@ proxy_ssl_connect(int fd, void *ssl) ssl_err = SSL_get_error(ssl, err); if (ssl_err == SSL_ERROR_WANT_READ) { - sret = select(fd + 1, &fds, NULL, NULL, &tv); + pfds.events = POLLIN; + sret = poll(&pfds, 1, 500); if (sret == -1) { return -1; } } else if (ssl_err == SSL_ERROR_WANT_WRITE) { - sret = select(fd + 1, NULL, &fds, NULL, &tv); + pfds.events = POLLOUT; + sret = poll(&pfds, 1, 500); if (sret == -1) { return -1; } @@ -725,10 +726,7 @@ proxy_setup_ep_connect(uint32_t epip, uint16_t epport, uint8_t protocol, { int fd, rc; struct sockaddr_in epaddr; - fd_set wrdy, errors; - struct timeval tv = { .tv_sec = 0, - .tv_usec = 500000 - }; + struct pollfd pfds = { 0 }; memset(&epaddr, 0, sizeof(epaddr)); epaddr.sin_family = AF_INET; @@ -751,15 +749,12 @@ proxy_setup_ep_connect(uint32_t epip, uint16_t epport, uint8_t protocol, return -1; } - FD_ZERO(&wrdy); - FD_SET(fd, &wrdy); + pfds.fd = fd; + pfds.events = POLLOUT|POLLERR; - FD_ZERO(&errors); - FD_SET(fd, &errors); - - rc = select(fd + 1, NULL, &wrdy, &errors, &tv); - if (rc <= 0) { - log_error("connect select %s:%u(%s)", inet_ntoa(*(struct in_addr *)(&epip)), ntohs(epport), strerror(errno)); + rc = poll(&pfds, 1, 500); + if (rc < 0) { + log_error("connect poll %s:%u(%s)", inet_ntoa(*(struct in_addr *)(&epip)), ntohs(epport), strerror(errno)); close(fd); return -1; } @@ -770,7 +765,7 @@ proxy_setup_ep_connect(uint32_t epip, uint16_t epport, uint8_t protocol, return -1; } - if (FD_ISSET(fd, &errors)) { + if (pfds.revents & POLLERR) { log_error("connect %s:%u(errors)", inet_ntoa(*(struct in_addr *)(&epip)), ntohs(epport)); close(fd); return -1; @@ -1282,7 +1277,7 @@ proxy_delete_entry(proxy_ent_t *ent, proxy_arg_t *arg) PROXY_UNLOCK(); if (fd > 0) { - notify_delete_ent(proxy_struct->ns, fd); + notify_delete_ent(proxy_struct->ns, fd, 0); close(fd); } @@ -1509,7 +1504,7 @@ proxy_release_fd_ctx(proxy_fd_ent_t *fd_ent, int reset) shutdown(fd_ent->fd, SHUT_RDWR); if (reset) { - log_trace("fd %d reset", fd_ent->fd); + log_trace("sockproxy fd %d reset", fd_ent->fd); proxy_reset_fd_list(fd_ent->head, fd_ent); close(fd_ent->fd); fd_ent->fd = -1; @@ -1518,6 +1513,8 @@ proxy_release_fd_ctx(proxy_fd_ent_t *fd_ent, int reset) fd_ent->ssl = NULL; } } + } else { + assert(0); } } @@ -1530,8 +1527,9 @@ proxy_release_rfd_ctx(proxy_fd_ent_t *pfe) fd_ent = pfe->rfd_ent[i]; if (fd_ent) { PROXY_ENT_LOCK(fd_ent); - log_trace("rfd %d release", fd_ent->fd); + log_trace("sockproxy rfd %d release", fd_ent->fd); proxy_release_fd_ctx(fd_ent, 0); + notify_delete_ent(proxy_struct->ns, fd_ent->fd, 1); pfe->rfd_ent[i] = NULL; for (int j = 0; j < fd_ent->n_rfd; j++) { fd_ent->rfd_ent[j] = NULL; @@ -1552,11 +1550,14 @@ proxy_pdestroy(void *priv) proxy_fd_ent_t *fd_ent; int is_listener = 0; + assert(pfe); + PROXY_LOCK(); if (pfe) { PROXY_ENT_LOCK(pfe); ent = pfe->head; if (!ent) { + assert(0); PROXY_ENT_UNLOCK(pfe); proxy_try_free_fd_ctx(pfe); PROXY_UNLOCK(); @@ -1575,14 +1576,12 @@ proxy_pdestroy(void *priv) } fd_ent = fd_ent->next; } - } /*else { - proxy_release_rfd_ctx(pfe); - }*/ + } - proxy_release_fd_ctx(pfe, 1); if (!is_listener) { proxy_release_rfd_ctx(pfe); } + proxy_release_fd_ctx(pfe, 1); PROXY_ENT_UNLOCK(pfe); proxy_try_free_fd_ctx(pfe); @@ -1704,8 +1703,8 @@ proxy_sock_read_err(proxy_fd_ent_t *pfe, int rval) return 1; case SSL_ERROR_ZERO_RETURN: default: - log_trace("ssl-err %s", - ERR_error_string(ERR_get_error(), NULL)); + //log_trace("ssl-err %s", + // ERR_error_string(ERR_get_error(), NULL)); SSL_shutdown(pfe->ssl); shutdown(pfe->fd, SHUT_RDWR); return -1; @@ -1719,17 +1718,12 @@ proxy_sock_read_err(proxy_fd_ent_t *pfe, int rval) static int proxy_ssl_accept(void *ssl, int fd) { - struct timeval tv; - fd_set fds; + struct pollfd pfds = { 0 }; int n_try = 0; int sel_rc; int ssl_rc; - tv.tv_sec = 0; - tv.tv_usec = 1000; - - FD_ZERO(&fds); - FD_SET(fd, &fds); + pfds.fd = fd; for (n_try = 0; n_try < 10; n_try++) { if ((ssl_rc = SSL_accept(ssl)) > 0) { @@ -1745,12 +1739,14 @@ proxy_ssl_accept(void *ssl, int fd) case SSL_ERROR_WANT_READ: log_error("ssl-accept want-read %s", ERR_error_string(ERR_get_error(), NULL)); - sel_rc = select(fd + 1, &fds, NULL, NULL, &tv); + pfds.events = POLLIN; + sel_rc = poll(&pfds, 1, 30); break; case SSL_ERROR_WANT_WRITE: log_error("ssl-accept want-write %s", ERR_error_string(ERR_get_error(), NULL)); - sel_rc = select(fd + 1, NULL, &fds, NULL, &tv); + pfds.events = POLLOUT; + sel_rc = poll(&pfds, 1, 30); break; default: log_error("ssl-accept failed %s", @@ -1983,7 +1979,7 @@ handle_url(llhttp_t *parser, const char *at, size_t length) #endif static int -proxy_notifer(int fd, notify_type_t type, void *priv) +proxy_notifier(int fd, notify_type_t type, void *priv) { struct llb_sockmap_key key = { 0 }; struct llb_sockmap_key rkey = { 0 }; @@ -2011,7 +2007,7 @@ proxy_notifer(int fd, notify_type_t type, void *priv) return 0; } - log_trace("notify fd = %d(%d) type 0x%x", fd, pfe->fd, type); + //log_trace("notify fd = %d(%d) type 0x%x", fd, pfe->fd, type); restart: while (type) { @@ -2143,7 +2139,7 @@ proxy_notifer(int fd, notify_type_t type, void *priv) } if (setup_proxy_path(&key, &rkey, pfe, phurl)) { - log_error("proxy setup failed %d", fd); + log_trace("proxy setup failed %d", fd); goto restart; } } @@ -2176,7 +2172,7 @@ proxy_main(sockmap_cb_t sockmap_cb) { int startfd = PROXY_START_MAPFD; notify_cbs_t cbs = { 0 }; - cbs.notify = proxy_notifer; + cbs.notify = proxy_notifier; cbs.pdestroy = proxy_pdestroy; proxy_struct = calloc(sizeof(proxy_struct_t), 1);