From 40443246cf9f01c7c50e5a5738d1ed5d958480e8 Mon Sep 17 00:00:00 2001 From: Tom Klonikowski Date: Fri, 7 Oct 2011 17:14:00 +0200 Subject: [PATCH] ZOOKEEPER-1112: Add support for C client for SASL authentication Patch #3 3rd patch * includes a sasl-enabled cli as additional make artifacts When using the test configuration as in tests/jaas.digest.server.conf you can login via: cli_sasl_st -u super -h zk-sasl-md5 -m DIGEST-MD5 hostlist and password 'test'. If you set up Kerberos 5 on the server side and have a valid ticket its just: cli_sasl_st -m GSSAPI hostlist (Forward-ported from ticket attachment ZOOKEEPER-1112_3.patch by Damien Diederen.) --- .../zookeeper-client-c/Makefile.am | 16 + .../zookeeper-client-c/src/cli_sasl.c | 813 ++++++++++++++++++ 2 files changed, 829 insertions(+) create mode 100644 zookeeper-client/zookeeper-client-c/src/cli_sasl.c diff --git a/zookeeper-client/zookeeper-client-c/Makefile.am b/zookeeper-client/zookeeper-client-c/Makefile.am index 36b51b0e7f6..0c9a04ed69d 100644 --- a/zookeeper-client/zookeeper-client-c/Makefile.am +++ b/zookeeper-client/zookeeper-client-c/Makefile.am @@ -74,6 +74,14 @@ cli_st_SOURCES = src/cli.c cli_st_LDADD = libzookeeper_st.la cli_st_CFLAGS = $(SASL_CFLAGS) +if WANT_SASL +bin_PROGRAMS += cli_sasl_st + +cli_sasl_st_SOURCES = src/cli_sasl.c +cli_sasl_st_LDADD = libzookeeper_st.la +cli_sasl_st_CFLAGS = $(SASL_CFLAGS) +endif + if WANT_SYNCAPI bin_PROGRAMS += cli_mt load_gen @@ -85,6 +93,14 @@ load_gen_SOURCES = src/load_gen.c load_gen_LDADD = libzookeeper_mt.la load_gen_CFLAGS = -DTHREADED $(SASL_CFLAGS) +if WANT_SASL +bin_PROGRAMS += cli_sasl_mt + +cli_sasl_mt_SOURCES = src/cli_sasl.c +cli_sasl_mt_LDADD = libzookeeper_mt.la +cli_sasl_mt_CFLAGS = -DTHREADED $(SASL_CFLAGS) +endif + endif ######################################################################### diff --git a/zookeeper-client/zookeeper-client-c/src/cli_sasl.c b/zookeeper-client/zookeeper-client-c/src/cli_sasl.c new file mode 100644 index 00000000000..04e32e99aa2 --- /dev/null +++ b/zookeeper-client/zookeeper-client-c/src/cli_sasl.c @@ -0,0 +1,813 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#ifndef WIN32 +#include +#include +#include +#else +#include "winport.h" +//#include <-- can't include, conflicting definitions of close() +int read(int _FileHandle, void * _DstBuf, unsigned int _MaxCharCount); +int write(int _Filehandle, const void * _Buf, unsigned int _MaxCharCount); +#define ctime_r(tctime, buffer) ctime_s (buffer, 40, tctime) +#endif + +#include +#include +#include + +#ifdef YCA +#include +#endif + +#ifdef SASL +#include +#endif + +#define _LL_CAST_ (long long) + +static zhandle_t *zh; +static clientid_t myid; +static const char *clientIdFile = 0; +struct timeval startTime; +static char cmd[1024]; +static int batchMode=0; + +static int to_send=0; +static int sent=0; +static int recvd=0; + +static int shutdownThisThing=0; + +#ifdef SASL +zoo_sasl_conn_t *my_sasl_conn = NULL; +char *service = "zookeeper"; +char *host; +char *mech; +char *user; +char *realm; +#endif + +void processline(char *line); + +#ifdef SASL +static int getrealm(void *context __attribute__((unused)), int id, + const char **availrealms, const char **result) { + *result = realm; + return SASL_OK; +} + +static int simple(void *context __attribute__((unused)), int id, + const char **result, unsigned *len) { + /* paranoia check */ + if (!result) + return SASL_BADPARAM; + + switch (id) { + case SASL_CB_USER: + *result = user; + break; + case SASL_CB_AUTHNAME: + *result = user; + break; + default: + return SASL_BADPARAM; + } + + return SASL_OK; +} + +#ifndef HAVE_GETPASSPHRASE +static char * +getpassphrase(const char *prompt) { + return getpass(prompt); +} +#endif /* ! HAVE_GETPASSPHRASE */ + +static int getsecret(sasl_conn_t *conn, void *context __attribute__((unused)), + int id, sasl_secret_t **psecret) { + char *password; + size_t len; + static sasl_secret_t *x; + + /* paranoia check */ + if (!conn || !psecret || id != SASL_CB_PASS + ) + return SASL_BADPARAM; + + password = getpassphrase("Password: "); + if (!password) + return SASL_FAIL; + + len = strlen(password); + + x = (sasl_secret_t *) realloc(x, sizeof(sasl_secret_t) + len); + + if (!x) { + memset(password, 0, len); + return SASL_NOMEM; + } + + x->len = len; + strcpy((char *) x->data, password); + memset(password, 0, len); + + *psecret = x; + return SASL_OK; +} + +/* callbacks we support */ +sasl_callback_t callbacks[] = { + { SASL_CB_GETREALM, &getrealm, NULL }, + { SASL_CB_USER, &simple, NULL }, + { SASL_CB_AUTHNAME, &simple, NULL }, + { SASL_CB_PASS, &getsecret, NULL }, + { SASL_CB_LIST_END, NULL, NULL } }; +#endif + +static __attribute__ ((unused)) void +printProfileInfo(struct timeval start, struct timeval end, int thres, + const char* msg) +{ + int delay=(end.tv_sec*1000+end.tv_usec/1000)- + (start.tv_sec*1000+start.tv_usec/1000); + if(delay>thres) + fprintf(stderr,"%s: execution time=%dms\n",msg,delay); +} + +static const char* state2String(int state){ + if (state == 0) + return "CLOSED_STATE"; + if (state == ZOO_CONNECTING_STATE) + return "CONNECTING_STATE"; + if (state == ZOO_ASSOCIATING_STATE) + return "ASSOCIATING_STATE"; + if (state == ZOO_CONNECTED_STATE) + return "CONNECTED_STATE"; + if (state == ZOO_EXPIRED_SESSION_STATE) + return "EXPIRED_SESSION_STATE"; + if (state == ZOO_AUTH_FAILED_STATE) + return "AUTH_FAILED_STATE"; + + return "INVALID_STATE"; +} + +void watcher(zhandle_t *zzh, int type, int state, const char *path, + void* context) +{ + /* Be careful using zh here rather than zzh - as this may be mt code + * the client lib may call the watcher before zookeeper_init returns */ + + fprintf(stderr, "Watcher %d state = %s", type, state2String(state)); + if (path && strlen(path) > 0) { + fprintf(stderr, " for path %s", path); + } + fprintf(stderr, "\n"); + + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_CONNECTED_STATE) { + const clientid_t *id = zoo_client_id(zzh); + if (myid.client_id == 0 || myid.client_id != id->client_id) { + myid = *id; + fprintf(stderr, "Got a new session id: 0x%llx\n", + _LL_CAST_ myid.client_id); + if (clientIdFile) { + FILE *fh = fopen(clientIdFile, "w"); + if (!fh) { + perror(clientIdFile); + } else { + int rc = fwrite(&myid, sizeof(myid), 1, fh); + if (rc != sizeof(myid)) { + perror("writing client id"); + } + fclose(fh); + } + } +#ifdef SASL + const char *mechs; + int mechlen; + + if(mech) { + if(strcmp("GSSAPI", mech)==0 || (user && host)) { + zoo_sasl_connect(zzh, "zookeeper", host, &my_sasl_conn, &mechs, &mechlen); + fprintf(stderr, "Mechs [%d]: %s\n", mechlen, mechs); +#ifdef THREADED + zoo_sasl_authenticate(zh, my_sasl_conn, mech, mechs); +#else + zoo_asasl_authenticate(zh, my_sasl_conn, mech, mechs); +#endif + } else { + fprintf(stderr, "Mechanism %s requires username (-u) and host (-h zk-sasl-md5) to be specified\n", mech); + } + } +#endif + if(batchMode) { + processline(cmd); + shutdownThisThing = 1; + } + } + } else if (state == ZOO_AUTH_FAILED_STATE) { + fprintf(stderr, "Authentication failure. Shutting down...\n"); + zookeeper_close(zzh); + shutdownThisThing=1; + zh=0; + } else if (state == ZOO_EXPIRED_SESSION_STATE) { + fprintf(stderr, "Session expired. Shutting down...\n"); + zookeeper_close(zzh); + shutdownThisThing=1; + zh=0; + } + } +} + +void dumpStat(const struct Stat *stat) { + char tctimes[40]; + char tmtimes[40]; + time_t tctime; + time_t tmtime; + + if (!stat) { + fprintf(stderr,"null\n"); + return; + } + tctime = stat->ctime/1000; + tmtime = stat->mtime/1000; + + ctime_r(&tmtime, tmtimes); + ctime_r(&tctime, tctimes); + + fprintf(stderr, "\tctime = %s\tczxid=%llx\n" + "\tmtime=%s\tmzxid=%llx\n" + "\tversion=%x\taversion=%x\n" + "\tephemeralOwner = %llx\n", + tctimes, _LL_CAST_ stat->czxid, tmtimes, + _LL_CAST_ stat->mzxid, + (unsigned int)stat->version, (unsigned int)stat->aversion, + _LL_CAST_ stat->ephemeralOwner); +} + +void my_string_completion(int rc, const char *name, const void *data) { + fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc); + if (!rc) { + fprintf(stderr, "\tname = %s\n", name); + } + if(batchMode) + shutdownThisThing=1; +} + +void my_data_completion(int rc, const char *value, int value_len, + const struct Stat *stat, const void *data) { + struct timeval tv; + int sec; + int usec; + gettimeofday(&tv, 0); + sec = tv.tv_sec - startTime.tv_sec; + usec = tv.tv_usec - startTime.tv_usec; + fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000); + fprintf(stderr, "%s: rc = %d\n", (char*)data, rc); + if (value) { + fprintf(stderr, " value_len = %d\n", value_len); + assert(write(2, value, value_len) == value_len); + } + fprintf(stderr, "\nStat:\n"); + dumpStat(stat); + free((void*)data); + if(batchMode) + shutdownThisThing=1; +} + +void my_silent_data_completion(int rc, const char *value, int value_len, + const struct Stat *stat, const void *data) { + recvd++; + fprintf(stderr, "Data completion %s rc = %d\n",(char*)data,rc); + free((void*)data); + if (recvd==to_send) { + fprintf(stderr,"Recvd %d responses for %d requests sent\n",recvd,to_send); + if(batchMode) + shutdownThisThing=1; + } +} + +void my_strings_completion(int rc, const struct String_vector *strings, + const void *data) { + struct timeval tv; + int sec; + int usec; + int i; + + gettimeofday(&tv, 0); + sec = tv.tv_sec - startTime.tv_sec; + usec = tv.tv_usec - startTime.tv_usec; + fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000); + fprintf(stderr, "%s: rc = %d\n", (char*)data, rc); + if (strings) + for (i=0; i < strings->count; i++) { + fprintf(stderr, "\t%s\n", strings->data[i]); + } + free((void*)data); + gettimeofday(&tv, 0); + sec = tv.tv_sec - startTime.tv_sec; + usec = tv.tv_usec - startTime.tv_usec; + fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000); + if(batchMode) + shutdownThisThing=1; +} + +void my_strings_stat_completion(int rc, const struct String_vector *strings, + const struct Stat *stat, const void *data) { + my_strings_completion(rc, strings, data); + dumpStat(stat); + if(batchMode) + shutdownThisThing=1; +} + +void my_void_completion(int rc, const void *data) { + fprintf(stderr, "%s: rc = %d\n", (char*)data, rc); + free((void*)data); + if(batchMode) + shutdownThisThing=1; +} + +void my_stat_completion(int rc, const struct Stat *stat, const void *data) { + fprintf(stderr, "%s: rc = %d Stat:\n", (char*)data, rc); + dumpStat(stat); + free((void*)data); + if(batchMode) + shutdownThisThing=1; +} + +void my_silent_stat_completion(int rc, const struct Stat *stat, + const void *data) { + // fprintf(stderr, "State completion: [%s] rc = %d\n", (char*)data, rc); + sent++; + free((void*)data); +} + +static void sendRequest(const char* data) { + zoo_aset(zh, "/od", data, strlen(data), -1, my_silent_stat_completion, + strdup("/od")); + zoo_aget(zh, "/od", 1, my_silent_data_completion, strdup("/od")); +} + +void od_completion(int rc, const struct Stat *stat, const void *data) { + int i; + fprintf(stderr, "od command response: rc = %d Stat:\n", rc); + dumpStat(stat); + // send a whole bunch of requests + recvd=0; + sent=0; + to_send=200; + for (i=0; i\n"); + fprintf(stderr, " delete \n"); + fprintf(stderr, " set \n"); + fprintf(stderr, " get \n"); + fprintf(stderr, " ls \n"); + fprintf(stderr, " ls2 \n"); + fprintf(stderr, " sync \n"); + fprintf(stderr, " exists \n"); + fprintf(stderr, " myid\n"); + fprintf(stderr, " verbose\n"); + fprintf(stderr, " addauth \n"); + fprintf(stderr, " quit\n"); + fprintf(stderr, "\n"); + fprintf(stderr, " prefix the command with the character 'a' to run the command asynchronously.\n"); + fprintf(stderr, " run the 'verbose' command to toggle verbose logging.\n"); + fprintf(stderr, " i.e. 'aget /foo' to get /foo asynchronously\n"); + } else if (startsWith(line, "verbose")) { + if (verbose) { + verbose = 0; + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); + fprintf(stderr, "logging level set to WARN\n"); + } else { + verbose = 1; + zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG); + fprintf(stderr, "logging level set to DEBUG\n"); + } + } else if (startsWith(line, "get ")) { + line += 4; + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } + + rc = zoo_aget(zh, line, 1, my_data_completion, strdup(line)); + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (startsWith(line, "set ")) { + char *ptr; + line += 4; + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } + ptr = strchr(line, ' '); + if (!ptr) { + fprintf(stderr, "No data found after path\n"); + return; + } + *ptr = '\0'; + ptr++; + if (async) { + rc = zoo_aset(zh, line, ptr, strlen(ptr), -1, my_stat_completion, + strdup(line)); + } else { +#ifdef THREADED + struct Stat stat; + rc = zoo_set2(zh, line, ptr, strlen(ptr), -1, &stat); +#else + rc = ZSYSTEMERROR; +#endif + } + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (startsWith(line, "ls ")) { + line += 3; + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } + gettimeofday(&startTime, 0); + rc= zoo_aget_children(zh, line, 1, my_strings_completion, strdup(line)); + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (startsWith(line, "ls2 ")) { + line += 4; + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } + gettimeofday(&startTime, 0); + rc= zoo_aget_children2(zh, line, 1, my_strings_stat_completion, strdup(line)); + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (startsWith(line, "create ")) { + int flags = 0; + line += 7; + if (line[0] == '+') { + line++; + if (line[0] == 'e') { + flags |= ZOO_EPHEMERAL; + line++; + } + if (line[0] == 's') { + flags |= ZOO_SEQUENCE; + line++; + } + line++; + } + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } + fprintf(stderr, "Creating [%s] node\n", line); +// { +// struct ACL _CREATE_ONLY_ACL_ACL[] = {{ZOO_PERM_CREATE, ZOO_ANYONE_ID_UNSAFE}}; +// struct ACL_vector CREATE_ONLY_ACL = {1,_CREATE_ONLY_ACL_ACL}; +// rc = zoo_acreate(zh, line, "new", 3, &CREATE_ONLY_ACL, flags, +// my_string_completion, strdup(line)); +// } + rc = zoo_acreate(zh, line, "new", 3, &ZOO_OPEN_ACL_UNSAFE, flags, + my_string_completion, strdup(line)); + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (startsWith(line, "delete ")) { + line += 7; + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } + if (async) { + rc = zoo_adelete(zh, line, -1, my_void_completion, strdup(line)); + } else { +#ifdef THREADED + rc = zoo_delete(zh, line, -1); +#else + rc = ZSYSTEMERROR; +#endif + } + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (startsWith(line, "sync ")) { + line += 5; + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } + rc = zoo_async(zh, line, my_string_completion, strdup(line)); + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (startsWith(line, "exists ")) { +#ifdef THREADED + struct Stat stat; +#endif + line += 7; + if (line[0] != '/') { + fprintf(stderr, "Path must start with /, found: %s\n", line); + return; + } +#ifndef THREADED + rc = zoo_aexists(zh, line, 1, my_stat_completion, strdup(line)); +#else + rc = zoo_exists(zh, line, 1, &stat); +#endif + if (rc) { + fprintf(stderr, "Error %d for %s\n", rc, line); + } + } else if (strcmp(line, "myid") == 0) { + printf("session Id = %llx\n", _LL_CAST_ zoo_client_id(zh)->client_id); + } else if (strcmp(line, "reinit") == 0) { + zookeeper_close(zh); + // we can't send myid to the server here -- zookeeper_close() removes + // the session on the server. We must start anew. + zh = zookeeper_init(hostPort, watcher, 30000, 0, 0, 0); + } else if (startsWith(line, "quit")) { + fprintf(stderr, "Quitting...\n"); + shutdownThisThing=1; + } else if (startsWith(line, "od")) { + const char val[]="fire off"; + fprintf(stderr, "Overdosing...\n"); + rc = zoo_aset(zh, "/od", val, sizeof(val)-1, -1, od_completion, 0); + if (rc) + fprintf(stderr, "od command failed: %d\n", rc); + } else if (startsWith(line, "addauth ")) { + char *ptr; + line += 8; + ptr = strchr(line, ' '); + if (ptr) { + *ptr = '\0'; + ptr++; + } + zoo_add_auth(zh, line, ptr, ptr ? strlen(ptr)-1 : 0, NULL, NULL); + } +} + +static int usage(char **argv) { +#ifdef SASL + const char *SASL_FEATURE = " (with sasl support)"; +#else + const char *SASL_FEATURE = ""; +#endif + fprintf(stderr, + "USAGE %s [-u sasluser -m saslmech] [-r saslrealm] [-i clientIdFile] [-c cmd] zookeeper_host_list\n", + argv[0]); + fprintf(stderr, + "Version: ZooKeeper cli (c client) version %d.%d.%d%s\n", + ZOO_MAJOR_VERSION, + ZOO_MINOR_VERSION, + ZOO_PATCH_VERSION, + SASL_FEATURE); + return 2; +} + +int main(int argc, char **argv) { +#ifndef THREADED + fd_set rfds, wfds, efds; + int processed=0; +#endif + char buffer[4096]; + char p[2048]; +#ifdef YCA + char *cert=0; + char appId[64]; +#endif + int bufoff = 0; + FILE *fh; + int c; +#ifdef SASL + while ((c = getopt(argc, argv, "u:h:i:s:m:c:r:")) != EOF) { + switch(c) { + case 'u': + user = optarg; + break; + + case 's': + service = optarg; + break; + + case 'h': + host = optarg; + break; + + case 'm': + mech = optarg; + break; + + case 'r': + realm = optarg; + break; +#else + while ((c = getopt(argc, argv, "i:c:")) != EOF) { + switch(c) { +#endif + case 'i': + clientIdFile = optarg; + fh = fopen(clientIdFile, "r"); + if (fh) { + if (fread(&myid, sizeof(myid), 1, fh) != sizeof(myid)) { + memset(&myid, 0, sizeof(myid)); + } + fclose(fh); + } + break; + + case 'c': + strcpy(cmd,optarg); + batchMode = 1; + break; + + default: + return usage(argv); + break; + } + } + + if (optind > argc - 1) { + return usage(argv); + } + if (optind == argc - 1) { + hostPort = argv[optind]; + } +#ifdef YCA + strcpy(appId,"yahoo.example.yca_test"); + cert = yca_get_cert_once(appId); + if(cert!=0) { + fprintf(stderr,"Certificate for appid [%s] is [%s]\n",appId,cert); + strncpy(p,cert,sizeof(p)-1); + free(cert); + } else { + fprintf(stderr,"Certificate for appid [%s] not found\n",appId); + strcpy(p,"dummy"); + } +#else + strcpy(p, "dummy"); +#endif + verbose = 1; + + zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG); + zoo_deterministic_conn_order(1); // enable deterministic order + zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0); + if (!zh) { + return errno; + } + +#ifdef SASL + if(zoo_sasl_init(zh, callbacks)!=ZOK) { + return 1; + } +#endif + +#ifdef YCA + if(zoo_add_auth(zh,"yca",p,strlen(p),0,0)!=ZOK) + return 2; +#endif + +#ifdef THREADED + while(!shutdownThisThing) { + int rc; + int len = sizeof(buffer) - bufoff -1; + if (len <= 0) { + fprintf(stderr, "Can't handle lines that long!\n"); + exit(2); + } + rc = read(0, buffer+bufoff, len); + if (rc <= 0) { + fprintf(stderr, "bye\n"); + shutdownThisThing=1; + break; + } + bufoff += rc; + buffer[bufoff] = '\0'; + while (strchr(buffer, '\n')) { + char *ptr = strchr(buffer, '\n'); + *ptr = '\0'; + processline(buffer); + ptr++; + memmove(buffer, ptr, strlen(ptr)+1); + bufoff = 0; + } + } +#else + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + while (!shutdownThisThing) { + int fd; + int interest; + int events; + struct timeval tv; + int rc; + zookeeper_interest(zh, &fd, &interest, &tv); + if (fd != -1) { + if (interest&ZOOKEEPER_READ) { + FD_SET(fd, &rfds); + } else { + FD_CLR(fd, &rfds); + } + if (interest&ZOOKEEPER_WRITE) { + FD_SET(fd, &wfds); + } else { + FD_CLR(fd, &wfds); + } + } else { + fd = 0; + } + FD_SET(0, &rfds); + rc = select(fd+1, &rfds, &wfds, &efds, &tv); + events = 0; + if (rc > 0) { + if (FD_ISSET(fd, &rfds)) { + events |= ZOOKEEPER_READ; + } + if (FD_ISSET(fd, &wfds)) { + events |= ZOOKEEPER_WRITE; + } + } + if(batchMode && processed==0){ + //batch mode + processline(cmd); + processed=1; + } + if (FD_ISSET(0, &rfds)) { + int rc; + int len = sizeof(buffer) - bufoff -1; + if (len <= 0) { + fprintf(stderr, "Can't handle lines that long!\n"); + exit(2); + } + rc = read(0, buffer+bufoff, len); + if (rc <= 0) { + fprintf(stderr, "bye\n"); + break; + } + bufoff += rc; + buffer[bufoff] = '\0'; + while (strchr(buffer, '\n')) { + char *ptr = strchr(buffer, '\n'); + *ptr = '\0'; + processline(buffer); + ptr++; + memmove(buffer, ptr, strlen(ptr)+1); + bufoff = 0; + } + } + zookeeper_process(zh, events); + } +#endif + if (to_send!=0) + fprintf(stderr,"Recvd %d responses for %d requests sent\n",recvd,sent); + zookeeper_close(zh); + return 0; +} +