Skip to content

Commit

Permalink
ZOOKEEPER-4210: Preserve return code from nonblocking send
Browse files Browse the repository at this point in the history
Async API calls attempt to flush the send buffer, which
calls flush_send_queue(); and can report
  ZOPERATIONTIMEOUT
  ZSYSTEMERROR
  ZCONNECTIONLOSS

Specifically: send_buffer() calls send(2) with MSG_NOSIGNAL,
which can return EPIPE; then send_buffer return -1, causing
ZCONNECTIONLOSS from flush_send_queue().

Current async API calls drop the return value from flush_send_queue(),
as below:

    adaptor_send_queue(zh, 0);
    return (rc < 0)?ZMARSHALLINGERROR:ZOK;

The async API then returns ZOK instead of ZCONNECTIONLOSS.

Author: Sam Mikes <[email protected]>

Reviewers: Enrico Olivelli <[email protected]>, Damien Diederen <[email protected]>

Closes apache#1602 from smikes/asyncsend-returncode-3.6
  • Loading branch information
smikes authored and RokLenarcic committed Sep 3, 2022
1 parent e92d9cb commit fac4ca5
Showing 1 changed file with 40 additions and 44 deletions.
84 changes: 40 additions & 44 deletions zookeeper-client/zookeeper-client-c/src/zookeeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -3963,6 +3963,19 @@ static int Request_path_watch_init(zhandle_t *zh, int mode,
/*---------------------------------------------------------------------------*
* ASYNC API
*---------------------------------------------------------------------------*/

/* make an attempt to send queued requests immediately without blocking */
static int nonblocking_send(zhandle_t *zh, int rc)
{
if (adaptor_send_queue(zh, 0) < 0) {
if (zh->fd->sock != -1) {
close_zsock(zh->fd);
zh->state = ZOO_NOTCONNECTED_STATE;
}
}
return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
}

int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
const void *data)
{
Expand Down Expand Up @@ -4002,9 +4015,8 @@ int zoo_awget(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
Expand Down Expand Up @@ -4046,9 +4058,8 @@ int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
Expand Down Expand Up @@ -4082,10 +4093,8 @@ int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
close_buffer_oarchive(&oa, 0);

LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);

return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
Expand Down Expand Up @@ -4128,9 +4137,8 @@ int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req,
Expand Down Expand Up @@ -4255,9 +4263,8 @@ int zoo_acreate_ttl(zhandle_t *zh, const char *path, const char *value,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int zoo_acreate2(zhandle_t *zh, const char *path, const char *value,
Expand Down Expand Up @@ -4322,9 +4329,8 @@ int zoo_acreate2_ttl(zhandle_t *zh, const char *path, const char *value,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
Expand Down Expand Up @@ -4362,9 +4368,8 @@ int zoo_adelete(zhandle_t *zh, const char *path, int version,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int zoo_aexists(zhandle_t *zh, const char *path, int watch,
Expand Down Expand Up @@ -4401,9 +4406,8 @@ int zoo_awexists(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

static int zoo_awget_children_(zhandle_t *zh, const char *path,
Expand Down Expand Up @@ -4434,9 +4438,8 @@ static int zoo_awget_children_(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
Expand Down Expand Up @@ -4482,9 +4485,8 @@ static int zoo_awget_children2_(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int zoo_aget_children2(zhandle_t *zh, const char *path, int watch,
Expand Down Expand Up @@ -4525,9 +4527,8 @@ int zoo_async(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}


Expand Down Expand Up @@ -4555,9 +4556,8 @@ int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
Expand Down Expand Up @@ -4586,9 +4586,8 @@ int zoo_aset_acl(zhandle_t *zh, const char *path, int version,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;

return nonblocking_send(zh, rc);
}

/* Completions for multi-op results */
Expand Down Expand Up @@ -4747,10 +4746,8 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,

LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s",
h.xid, index, zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);

return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
return nonblocking_send(zh, rc);
}

typedef union WatchesRequest WatchesRequest;
Expand Down Expand Up @@ -4833,7 +4830,6 @@ static int aremove_watches(
zh, h.xid, COMPLETION_VOID, completion, data, 0, wdo, 0);
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
leave_critical(zh);

/* We queued the buffer, so don't free it */
Expand All @@ -4842,7 +4838,7 @@ static int aremove_watches(
LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
h.xid, path, zoo_get_current_server(zh));

adaptor_send_queue(zh, 0);
rc = nonblocking_send(zh, rc);

done:
free_duplicate_path(server_path, path);
Expand Down

0 comments on commit fac4ca5

Please sign in to comment.