Skip to content

Commit

Permalink
Handle error in nonblocking send
Browse files Browse the repository at this point in the history
In a nonblocking send, we may be notified that the
connection was lost.  Currently this is not handled because
the return value of flush_send_queue() is ignored.

This change DOES NOT change the ABI or the return
codes that are returned by zoo_a* async functions.

This change DOES close the file descriptor and
change zh->state to reflect that the connection was lost.

Reorganized to minimize the change in behavior. Always
call adaptor_send_queue() even if there was a marshalling
error.
  • Loading branch information
smikes committed Feb 24, 2021
1 parent 8c68933 commit 548004f
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions zookeeper-client/zookeeper-client-c/src/zookeeper.c
Original file line number Diff line number Diff line change
@@ -3961,6 +3961,17 @@ static int Request_path_watch_init(zhandle_t *zh, int mode,
/*---------------------------------------------------------------------------*
* ASYNC API
*---------------------------------------------------------------------------*/
static int nonblocking_send(zhandle_t *zh, int rc)
{
if (adaptor_send_queue(zh, 0) < 0) {
if (is_connected(zh)) {
close_zsock(zh->fd);
zh->state = ZOO_NOTCONNECTED_STATE;
}
}
return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
}

int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
const void *data)
{
@@ -4000,9 +4011,9 @@ int zoo_awget(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
@@ -4044,9 +4055,9 @@ int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
@@ -4080,10 +4091,10 @@ int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
close_buffer_oarchive(&oa, 0);

LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return nonblocking_send(zh, rc);

return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}

static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
@@ -4126,9 +4137,9 @@ int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req,
@@ -4253,9 +4264,9 @@ int zoo_acreate_ttl(zhandle_t *zh, const char *path, const char *value,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

int zoo_acreate2(zhandle_t *zh, const char *path, const char *value,
@@ -4320,9 +4331,9 @@ int zoo_acreate2_ttl(zhandle_t *zh, const char *path, const char *value,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
@@ -4360,10 +4371,9 @@ int zoo_adelete(zhandle_t *zh, const char *path, int version,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
return nonblocking_send(zh, rc);}

int zoo_aexists(zhandle_t *zh, const char *path, int watch,
stat_completion_t sc, const void *data)
@@ -4399,9 +4409,10 @@ int zoo_awexists(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));


/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

static int zoo_awget_children_(zhandle_t *zh, const char *path,
@@ -4432,9 +4443,9 @@ static int zoo_awget_children_(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
@@ -4480,9 +4491,9 @@ static int zoo_awget_children2_(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

int zoo_aget_children2(zhandle_t *zh, const char *path, int watch,
@@ -4523,9 +4534,9 @@ int zoo_async(zhandle_t *zh, const char *path,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}


@@ -4553,9 +4564,9 @@ int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
@@ -4584,9 +4595,9 @@ int zoo_aset_acl(zhandle_t *zh, const char *path, int version,

LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
zoo_get_current_server(zh));

/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
return nonblocking_send(zh, rc);
}

/* Completions for multi-op results */
@@ -4745,10 +4756,9 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,

LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s",
h.xid, index, zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);

return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
/* make a best (non-blocking) effort to send the requests asap */
return nonblocking_send(zh, rc);
}

typedef union WatchesRequest WatchesRequest;
@@ -4840,7 +4850,7 @@ static int aremove_watches(
LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
h.xid, path, zoo_get_current_server(zh));

adaptor_send_queue(zh, 0);
rc = nonblocking_send(zh, rc);

done:
free_duplicate_path(server_path, path);

0 comments on commit 548004f

Please sign in to comment.