From 1a0e8a8c136b26cf00cdc016a8dd134dee40570f Mon Sep 17 00:00:00 2001 From: Sam Mikes Date: Tue, 9 Mar 2021 20:29:08 +0000 Subject: [PATCH] ZOOKEEPER-4210: Preserve return code from nonblocking send Async API calls attempt to flush the send buffer, which calls flush_send_queue(); and can report ZOPERATIONTIMEOUT ZSYSTEMERROR ZCONNECTIONLOSS Specifically: send_buffer() calls send(2) with MSG_NOSIGNAL, which can return EPIPE; then send_buffer return -1, causing ZCONNECTIONLOSS from flush_send_queue(). Current async API calls drop the return value from flush_send_queue(), as below: adaptor_send_queue(zh, 0); return (rc < 0)?ZMARSHALLINGERROR:ZOK; The async API then returns ZOK instead of ZCONNECTIONLOSS. Author: Sam Mikes Reviewers: Enrico Olivelli , Damien Diederen Closes #1602 from smikes/asyncsend-returncode-3.6 --- .../zookeeper-client-c/src/zookeeper.c | 84 +++++++++---------- 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c index 72a05a96991..0504a746fc7 100644 --- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c +++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c @@ -3963,6 +3963,19 @@ static int Request_path_watch_init(zhandle_t *zh, int mode, /*---------------------------------------------------------------------------* * ASYNC API *---------------------------------------------------------------------------*/ + +/* make an attempt to send queued requests immediately without blocking */ +static int nonblocking_send(zhandle_t *zh, int rc) +{ + if (adaptor_send_queue(zh, 0) < 0) { + if (zh->fd->sock != -1) { + close_zsock(zh->fd); + zh->state = ZOO_NOTCONNECTED_STATE; + } + } + return (rc < 0) ? ZMARSHALLINGERROR : ZOK; +} + int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc, const void *data) { @@ -4002,9 +4015,8 @@ int zoo_awget(zhandle_t *zh, const char *path, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc, @@ -4046,9 +4058,8 @@ int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving, @@ -4082,10 +4093,8 @@ int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving, close_buffer_oarchive(&oa, 0); LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + return nonblocking_send(zh, rc); } static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req, @@ -4128,9 +4137,8 @@ int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req, @@ -4255,9 +4263,8 @@ int zoo_acreate_ttl(zhandle_t *zh, const char *path, const char *value, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int zoo_acreate2(zhandle_t *zh, const char *path, const char *value, @@ -4322,9 +4329,8 @@ int zoo_acreate2_ttl(zhandle_t *zh, const char *path, const char *value, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req, @@ -4362,9 +4368,8 @@ int zoo_adelete(zhandle_t *zh, const char *path, int version, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int zoo_aexists(zhandle_t *zh, const char *path, int watch, @@ -4401,9 +4406,8 @@ int zoo_awexists(zhandle_t *zh, const char *path, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } static int zoo_awget_children_(zhandle_t *zh, const char *path, @@ -4434,9 +4438,8 @@ static int zoo_awget_children_(zhandle_t *zh, const char *path, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int zoo_aget_children(zhandle_t *zh, const char *path, int watch, @@ -4482,9 +4485,8 @@ static int zoo_awget_children2_(zhandle_t *zh, const char *path, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int zoo_aget_children2(zhandle_t *zh, const char *path, int watch, @@ -4525,9 +4527,8 @@ int zoo_async(zhandle_t *zh, const char *path, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } @@ -4555,9 +4556,8 @@ int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } int zoo_aset_acl(zhandle_t *zh, const char *path, int version, @@ -4586,9 +4586,8 @@ int zoo_aset_acl(zhandle_t *zh, const char *path, int version, LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + + return nonblocking_send(zh, rc); } /* Completions for multi-op results */ @@ -4747,10 +4746,8 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops, LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s", h.xid, index, zoo_get_current_server(zh)); - /* make a best (non-blocking) effort to send the requests asap */ - adaptor_send_queue(zh, 0); - return (rc < 0) ? ZMARSHALLINGERROR : ZOK; + return nonblocking_send(zh, rc); } typedef union WatchesRequest WatchesRequest; @@ -4833,7 +4830,6 @@ static int aremove_watches( zh, h.xid, COMPLETION_VOID, completion, data, 0, wdo, 0); rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); - rc = rc < 0 ? ZMARSHALLINGERROR : ZOK; leave_critical(zh); /* We queued the buffer, so don't free it */ @@ -4842,7 +4838,7 @@ static int aremove_watches( LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s", h.xid, path, zoo_get_current_server(zh)); - adaptor_send_queue(zh, 0); + rc = nonblocking_send(zh, rc); done: free_duplicate_path(server_path, path);