Skip to content

Commit

Permalink
ZOOKEEPER-1998: Allow C client to throttle host name resolutions
Browse files Browse the repository at this point in the history
  • Loading branch information
ztzg committed Feb 3, 2020
1 parent 68e1f7d commit 7e7570c
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 16 deletions.
26 changes: 26 additions & 0 deletions zookeeper-client/zookeeper-client-c/include/zookeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,32 @@ ZOOAPI sasl_callback_t *zoo_sasl_make_basic_callbacks(const char *user,
*/
ZOOAPI int zoo_set_servers(zhandle_t *zh, const char *hosts);

/**
* \brief sets a minimum delay to observe between "routine" host name
* resolutions.
*
* The client performs regular resolutions of the list of servers
* passed to \ref zookeeper_init or set with \ref zoo_set_servers in
* order to detect changes at the DNS level.
*
* By default, it does so every time it checks for socket readiness.
* This results in low latency in the detection of changes, but can
* lead to heavy DNS traffic when the local cache is not effective.
*
* This method allows an application to influence the rate of polling.
* When delay_ms is set to a value greater than zero, the client skips
* most "routine" resolutions which would have happened in a window of
* that many milliseconds since the last succesful one.
*
* Setting delay_ms to 0 disables this logic, reverting to the default
* behavior. Setting it to -1 disables network resolutions during
* normal operation (but not, e.g., on connection loss).
*
* \param delay_ms 0, -1, or the window size in milliseconds
* \return ZOK on success or ZBADARGUMENTS for invalid input parameters
*/
ZOOAPI int zoo_set_servers_resolution_delay(zhandle_t *zh, int delay_ms);

/**
* \brief cycle to the next server on the next connection attempt.
*
Expand Down
3 changes: 3 additions & 0 deletions zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ struct _zhandle {
addrvec_t addrs_old; // old list of addresses that we are no longer connected to
addrvec_t addrs_new; // new list of addresses to connect to if we're reconfiguring

struct timeval last_resolve; // time of last hostname resolution
int resolve_delay_ms; // see zoo_set_servers_resolution_delay

int reconfig; // Are we in the process of reconfiguring cluster's ensemble
double pOld, pNew; // Probability for selecting between 'addrs_old' and 'addrs_new'
int delay;
Expand Down
84 changes: 69 additions & 15 deletions zookeeper-client/zookeeper-client-c/src/zookeeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ typedef struct _completion_list {
} completion_list_t;

const char*err2string(int err);
static inline int calculate_interval(const struct timeval *start,
const struct timeval *end);
static int queue_session_event(zhandle_t *zh, int state);
static const char* format_endpoint_info(const struct sockaddr_storage* ep);

Expand Down Expand Up @@ -982,10 +984,14 @@ static int resolve_hosts(const zhandle_t *zh, const char *hosts_in, addrvec_t *a
*
* See zoo_cycle_next_server for the selection logic.
*
* \param ref_time an optional "reference time," used to determine if
* resolution can be skipped in accordance to the delay set by \ref
* zoo_set_servers_resolution_delay. Passing NULL prevents skipping.
*
* See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
* protocol and its evaluation,
*/
int update_addrs(zhandle_t *zh)
int update_addrs(zhandle_t *zh, const struct timeval *ref_time)
{
int rc = ZOK;
char *hosts = NULL;
Expand All @@ -1006,26 +1012,54 @@ int update_addrs(zhandle_t *zh)
return ZSYSTEMERROR;
}

// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
lock_reconfig(zh);

// Check if we are due for a host name resolution. (See
// zoo_set_servers_resolution_delay. The answer is always "yes"
// if no reference is provided or the file descriptor is invalid.)
if (ref_time && zh->fd->sock != -1) {
int do_resolve;

if (zh->resolve_delay_ms <= 0) {
// -1 disables, 0 means unconditional. Fail safe.
do_resolve = zh->resolve_delay_ms != -1;
} else {
int elapsed_ms = calculate_interval(&zh->last_resolve, ref_time);
// Include < 0 in case of overflow, or if we are not
// backed by a monotonic clock.
do_resolve = elapsed_ms > zh->resolve_delay_ms || elapsed_ms < 0;
}

if (!do_resolve) {
goto finish;
}
}

// Copy zh->hostname for local use
hosts = strdup(zh->hostname);
if (hosts == NULL) {
rc = ZSYSTEMERROR;
goto fail;
goto finish;
}

rc = resolve_hosts(zh, hosts, &resolved);
if (rc != ZOK)
{
goto fail;
goto finish;
}

// Unconditionally note last resolution time.
if (ref_time) {
zh->last_resolve = *ref_time;
} else {
get_system_time(&zh->last_resolve);
}

// If the addrvec list is identical to last time we ran don't do anything
if (addrvec_eq(&zh->addrs, &resolved))
{
goto fail;
goto finish;
}

// Is the server we're connected to in the new resolved list?
Expand All @@ -1045,14 +1079,14 @@ int update_addrs(zhandle_t *zh)
rc = addrvec_append(&zh->addrs_old, resolved_address);
if (rc != ZOK)
{
goto fail;
goto finish;
}
}
else {
rc = addrvec_append(&zh->addrs_new, resolved_address);
if (rc != ZOK)
{
goto fail;
goto finish;
}
}
}
Expand Down Expand Up @@ -1105,13 +1139,13 @@ int update_addrs(zhandle_t *zh)
zh->state = ZOO_NOTCONNECTED_STATE;
}

fail:
finish:

unlock_reconfig(zh);

// If we short-circuited out and never assigned resolved to zh->addrs then we
// need to free resolved to avoid a memleak.
if (zh->addrs.data != resolved.data)
if (resolved.data && zh->addrs.data != resolved.data)
{
addrvec_free(&resolved);
}
Expand Down Expand Up @@ -1308,7 +1342,7 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
if (zh->hostname == 0) {
goto abort;
}
if(update_addrs(zh) != 0) {
if(update_addrs(zh, NULL) != 0) {
goto abort;
}

Expand Down Expand Up @@ -1402,7 +1436,7 @@ int zoo_set_servers(zhandle_t *zh, const char *hosts)
return ZBADARGUMENTS;
}

// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
lock_reconfig(zh);

// Reset hostname to new set of hosts to connect to
Expand All @@ -1414,7 +1448,27 @@ int zoo_set_servers(zhandle_t *zh, const char *hosts)

unlock_reconfig(zh);

return update_addrs(zh);
return update_addrs(zh, NULL);
}

/*
* Sets a minimum delay to observe between "routine" host name
* resolutions. See prototype for full documentation.
*/
int zoo_set_servers_resolution_delay(zhandle_t *zh, int delay_ms) {
if (delay_ms < -1) {
LOG_ERROR(LOGCALLBACK(zh), "Resolution delay cannot be %d", delay_ms);
return ZBADARGUMENTS;
}

// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
lock_reconfig(zh);

zh->resolve_delay_ms = delay_ms;

unlock_reconfig(zh);

return ZOK;
}

/**
Expand Down Expand Up @@ -1479,7 +1533,7 @@ static int get_next_server_in_reconfig(zhandle_t *zh)
*/
void zoo_cycle_next_server(zhandle_t *zh)
{
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
lock_reconfig(zh);

memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
Expand Down Expand Up @@ -1511,7 +1565,7 @@ const char* zoo_get_current_server(zhandle_t* zh)
{
const char *endpoint_info = NULL;

// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
// Need the lock here as it is changed in update_addrs()
lock_reconfig(zh);

Expand Down Expand Up @@ -2413,7 +2467,7 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
}
api_prolog(zh);

rc = update_addrs(zh);
rc = update_addrs(zh, &now);
if (rc != ZOK) {
return api_epilog(zh, rc);
}
Expand Down
74 changes: 73 additions & 1 deletion zookeeper-client/zookeeper-client-c/tests/TestClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
CPPUNIT_TEST(testWatcherAutoResetWithLocal);
CPPUNIT_TEST(testGetChildren2);
CPPUNIT_TEST(testLastZxid);
CPPUNIT_TEST(testServersResolutionDelay);
CPPUNIT_TEST(testRemoveWatchers);
#endif
CPPUNIT_TEST_SUITE_END();
Expand Down Expand Up @@ -381,7 +382,78 @@ class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
CPPUNIT_ASSERT(zh->io_count < 2);
zookeeper_close(zh);
}


/* Checks the zoo_set_servers_resolution_delay default and operation */
void testServersResolutionDelay() {
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
int rc;
struct timeval tv;
struct Stat stat;

CPPUNIT_ASSERT(zk);
CPPUNIT_ASSERT(zk->resolve_delay_ms == 0);

// a) Default/0 case: resolve at each request.

tv = zk->last_resolve;
usleep(10000); // 10ms

rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);

// Must have changed because of the request.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec != tv.tv_sec ||
zk->last_resolve.tv_usec != tv.tv_usec);

// b) Disabled/-1 case: never perform "routine" resolutions.

rc = zoo_set_servers_resolution_delay(zk, -1); // Disabled
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);

tv = zk->last_resolve;
usleep(10000); // 10ms

rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);

// Must not have changed as auto-resolution is disabled.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec == tv.tv_sec &&
zk->last_resolve.tv_usec == tv.tv_usec);

// c) Invalid delay is rejected.

rc = zoo_set_servers_resolution_delay(zk, -1000); // Bad
CPPUNIT_ASSERT_EQUAL((int)ZBADARGUMENTS, rc);

// d) Valid delay, no resolution within window.

rc = zoo_set_servers_resolution_delay(zk, 500); // 0.5s
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);

tv = zk->last_resolve;
usleep(10000); // 10ms

rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);

// Must not have changed because the request (hopefully!)
// executed in less than 0.5s.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec == tv.tv_sec &&
zk->last_resolve.tv_usec == tv.tv_usec);

// e) Valid delay, at least one resolution after delay.

usleep(500 * 1000); // 0.5s

rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);

// Must have changed because we waited 0.5s between the
// capture and the last request.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec != tv.tv_sec ||
zk->last_resolve.tv_usec != tv.tv_usec);
}

void testPing()
{
Expand Down

0 comments on commit 7e7570c

Please sign in to comment.