Skip to content

Commit

Permalink
ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointe…
Browse files Browse the repository at this point in the history
…r (Marshall McMullen via michim)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1484357 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Michi Mutsuzaki committed May 19, 2013
1 parent 4db12a9 commit 37973fa
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 252 deletions.
5 changes: 4 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ NEW FEATURES:
ZOOKEEPER-1572. Add an async (Java) interface for multi request (Sijie Guo via camille)

ZOOKEEPER-107. Allow dynamic changes to server cluster membership (Alex Shraer via breed)


ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer
(Marshall McMullen via michim)

BUGFIXES:

ZOOKEEPER-786. Exception in ZooKeeper.toString
Expand Down
73 changes: 73 additions & 0 deletions src/c/include/zookeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ typedef struct zoo_op_result {
typedef void (*watcher_fn)(zhandle_t *zh, int type,
int state, const char *path,void *watcherCtx);

/**
* \brief typedef for setting the log callback. It's a function pointer which
* returns void and accepts a const char* as its only argument.
*
* \param message message to be passed to the callback function.
*/
typedef void (*log_callback_fn)(const char *message);

/**
* \brief create a handle to used communicate with zookeeper.
*
Expand Down Expand Up @@ -457,6 +465,45 @@ typedef void (*watcher_fn)(zhandle_t *zh, int type,
ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
int recv_timeout, const clientid_t *clientid, void *context, int flags);

/**
* \brief create a handle to communicate with zookeeper.
*
* This function is identical to \ref zookeeper_init except it allows one
* to specify an additional callback to be used for all logging for that
* specific connection. For more details on the logging callback see
* \ref zoo_get_log_callback and \ref zoo_set_log_callback.
*
* This method creates a new handle and a zookeeper session that corresponds
* to that handle. Session establishment is asynchronous, meaning that the
* session should not be considered established until (and unless) an
* event of state ZOO_CONNECTED_STATE is received.
* \param host comma separated host:port pairs, each corresponding to a zk
* server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
* \param fn the global watcher callback function. When notifications are
* triggered this function will be invoked.
* \param clientid the id of a previously established session that this
* client will be reconnecting to. Pass 0 if not reconnecting to a previous
* session. Clients can access the session id of an established, valid,
* connection by calling \ref zoo_client_id. If the session corresponding to
* the specified clientid has expired, or if the clientid is invalid for
* any reason, the returned zhandle_t will be invalid -- the zhandle_t
* state will indicate the reason for failure (typically
* ZOO_EXPIRED_SESSION_STATE).
* \param context the handback object that will be associated with this instance
* of zhandle_t. Application can access it (for example, in the watcher
* callback) using \ref zoo_get_context. The object is not used by zookeeper
* internally and can be null.
* \param flags reserved for future use. Should be set to zero.
* \param log_callback All log messages will be passed to this callback function.
* For more details see \ref zoo_get_log_callback and \ref zoo_set_log_callback.
* \return a pointer to the opaque zhandle structure. If it fails to create
* a new zhandle the function returns NULL and the errno variable
* indicates the reason.
*/
ZOOAPI zhandle_t *zookeeper_init2(const char *host, watcher_fn fn,
int recv_timeout, const clientid_t *clientid, void *context, int flags,
log_callback_fn log_callback);

/**
* \brief update the list of servers this client will connect to.
*
Expand Down Expand Up @@ -1407,6 +1454,32 @@ ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel);
*/
ZOOAPI void zoo_set_log_stream(FILE* logStream);

/**
* \brief gets the callback to be used by this connection for logging.
*
* This is a per-connection logging mechanism that will take priority over
* the library-wide default log stream. That is, zookeeper library will first
* try to use a per-connection callback if available and if not, will fallback
* to using the logging stream. Passing in NULL resets the callback and will
* cause it to then fallback to using the logging stream as described in \ref
* zoo_set_log_stream.
*/
ZOOAPI log_callback_fn zoo_get_log_callback(const zhandle_t *zh);

/**
* \brief sets the callback to be used by the library for logging
*
* Setting this callback has the effect of overriding the default log stream.
* Zookeeper will first try to use a per-connection callback if available
* and if not, will fallback to using the logging stream. Passing in NULL
* resets the callback and will cause it to then fallback to using the logging
* stream as described in \ref zoo_set_log_stream.
*
* Note: The provided callback will be invoked by multiple threads and therefore
* it needs to be thread-safe.
*/
ZOOAPI void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback);

/**
* \brief enable/disable quorum endpoint order randomization
*
Expand Down
33 changes: 16 additions & 17 deletions src/c/include/zookeeper_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@ extern "C" {
#endif

extern ZOOAPI ZooLogLevel logLevel;
#define LOGSTREAM getLogStream()

#define LOG_ERROR(x) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
log_message(ZOO_LOG_LEVEL_ERROR,__LINE__,__func__,format_log_message x)
#define LOG_WARN(x) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
log_message(ZOO_LOG_LEVEL_WARN,__LINE__,__func__,format_log_message x)
#define LOG_INFO(x) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
log_message(ZOO_LOG_LEVEL_INFO,__LINE__,__func__,format_log_message x)
#define LOG_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)

ZOOAPI void log_message(ZooLogLevel curLevel, int line,const char* funcName,
const char* message);

ZOOAPI const char* format_log_message(const char* format,...);

FILE* getLogStream();
#define LOGCALLBACK(_zh) zoo_get_log_callback(_zh)
#define LOGSTREAM NULL

#define LOG_ERROR(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
log_message(_cb, ZOO_LOG_LEVEL_ERROR, __LINE__, __func__, __VA_ARGS__)
#define LOG_WARN(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
log_message(_cb, ZOO_LOG_LEVEL_WARN, __LINE__, __func__, __VA_ARGS__)
#define LOG_INFO(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
log_message(_cb, ZOO_LOG_LEVEL_INFO, __LINE__, __func__, __VA_ARGS__)
#define LOG_DEBUG(_cb, ...) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
log_message(_cb, ZOO_LOG_LEVEL_DEBUG, __LINE__, __func__, __VA_ARGS__)

ZOOAPI void log_message(log_callback_fn callback, ZooLogLevel curLevel,
int line, const char* funcName, const char* format, ...);

FILE* zoo_get_log_stream();

#ifdef __cplusplus
}
Expand Down
30 changes: 15 additions & 15 deletions src/c/src/load_gen.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void listener(zhandle_t *zzh, int type, int state, const char *path,void* ctx) {
void create_completion(int rc, const char *name, const void *data) {
incCounter(-1);
if(rc!=ZOK){
LOG_ERROR(("Failed to create a node rc=%d",rc));
LOG_ERROR(LOGSTREAM, "Failed to create a node rc=%d",rc);
}
}

Expand All @@ -102,7 +102,7 @@ int doCreateNodes(const char* root, int count){
rc=zoo_acreate(zh, nodeName, "first", 5, &ZOO_OPEN_ACL_UNSAFE, 0,
create_completion, 0);
if(i%1000==0){
LOG_INFO(("Created %s",nodeName));
LOG_INFO(LOGSTREAM, "Created %s", nodeName);
}
if(rc!=ZOK) return rc;
}
Expand All @@ -116,7 +116,7 @@ int createRoot(const char* root){
void write_completion(int rc, const struct Stat *stat, const void *data) {
incCounter(-1);
if(rc!=ZOK){
LOG_ERROR(("Failed to write a node rc=%d",rc));
LOG_ERROR(LOGSTREAM, "Failed to write a node rc=%d",rc);
}
}

Expand All @@ -137,13 +137,13 @@ void read_completion(int rc, const char *value, int value_len,
const struct Stat *stat, const void *data) {
incCounter(-1);
if(rc!=ZOK){
LOG_ERROR(("Failed to read a node rc=%d",rc));
LOG_ERROR(LOGSTREAM, "Failed to read a node rc=%d",rc);
return;
}
if(memcmp(value,"second",6)!=0){
char buf[value_len+1];
memcpy(buf,value,value_len);buf[value_len]=0;
LOG_ERROR(("Invalid read, expected [second], received [%s]\n",buf));
LOG_ERROR(LOGSTREAM, "Invalid read, expected [second], received [%s]\n",buf);
exit(1);
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ int recursiveDelete(const char* root){
int rc=zoo_get_children(zh,root,0,&children);
if(rc!=ZNONODE){
if(rc!=ZOK){
LOG_ERROR(("Failed to get children of %s, rc=%d",root,rc));
LOG_ERROR(LOGSTREAM, "Failed to get children of %s, rc=%d",root,rc);
return rc;
}
for(i=0;i<children.count; i++){
Expand All @@ -214,10 +214,10 @@ int recursiveDelete(const char* root){
free_String_vector(&children);
}
if(deletedCounter%1000==0)
LOG_INFO(("Deleting %s",root));
LOG_INFO(LOGSTREAM, "Deleting %s",root);
rc=zoo_delete(zh,root,-1);
if(rc!=ZOK){
LOG_ERROR(("Failed to delete znode %s, rc=%d",root,rc));
LOG_ERROR(LOGSTREAM, "Failed to delete znode %s, rc=%d",root,rc);
}else
deletedCounter++;
return rc;
Expand Down Expand Up @@ -245,15 +245,15 @@ int main(int argc, char **argv) {
if (!zh)
return errno;

LOG_INFO(("Checking server connection..."));
LOG_INFO(LOGSTREAM, "Checking server connection...");
ensureConnected();
if(cleaning==1){
int rc = 0;
deletedCounter=0;
rc=recursiveDelete(argv[2]);
if(rc==ZOK){
LOG_INFO(("Succesfully deleted a subtree starting at %s (%d nodes)",
argv[2],deletedCounter));
LOG_INFO(LOGSTREAM, "Succesfully deleted a subtree starting at %s (%d nodes)",
argv[2],deletedCounter);
exit(0);
}
exit(1);
Expand All @@ -262,18 +262,18 @@ int main(int argc, char **argv) {
createRoot(argv[2]);
while(1) {
ensureConnected();
LOG_INFO(("Creating children for path %s",argv[2]));
LOG_INFO(LOGSTREAM, "Creating children for path %s",argv[2]);
doCreateNodes(argv[2],nodeCount);
waitCounter();

LOG_INFO(("Starting the write cycle for path %s",argv[2]));
LOG_INFO(LOGSTREAM, "Starting the write cycle for path %s",argv[2]);
doWrites(argv[2],nodeCount);
waitCounter();
LOG_INFO(("Starting the read cycle for path %s",argv[2]));
LOG_INFO(LOGSTREAM, "Starting the read cycle for path %s",argv[2]);
doReads(argv[2],nodeCount);
waitCounter();

LOG_INFO(("Starting the delete cycle for path %s",argv[2]));
LOG_INFO(LOGSTREAM, "Starting the delete cycle for path %s",argv[2]);
doDeletes(argv[2],nodeCount);
waitCounter();
}
Expand Down
22 changes: 11 additions & 11 deletions src/c/src/mt_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ unsigned __stdcall do_completion( void * );

int handle_error(SOCKET sock, char* message)
{
LOG_ERROR(("%s. %d",message, WSAGetLastError()));
LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError());
closesocket (sock);
return -1;
}
Expand All @@ -131,7 +131,7 @@ int create_socket_pair(SOCKET fds[2])

SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP);
if (lst == INVALID_SOCKET ){
LOG_ERROR(("Error creating socket. %d",WSAGetLastError()));
LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError());
return -1;
}
memset(&inaddr, 0, sizeof(inaddr));
Expand Down Expand Up @@ -218,7 +218,7 @@ void start_threads(zhandle_t* zh)
// use api_prolog() to make sure zhandle doesn't get destroyed
// while initialization is in progress
api_prolog(zh);
LOG_DEBUG(("starting threads..."));
LOG_DEBUG(LOGCALLBACK(zh), "starting threads...");
rc=pthread_create(&adaptor->io, 0, do_io, zh);
assert("pthread_create() failed for the IO thread"&&!rc);
rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
Expand All @@ -232,17 +232,17 @@ int adaptor_init(zhandle_t *zh)
pthread_mutexattr_t recursive_mx_attr;
struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
if (!adaptor_threads) {
LOG_ERROR(("Out of memory"));
LOG_ERROR(LOGCALLBACK(zh), "Out of memory");
return -1;
}

/* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
#ifdef WIN32
if (create_socket_pair(adaptor_threads->self_pipe) == -1){
LOG_ERROR(("Can't make a socket."));
LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket.");
#else
if(pipe(adaptor_threads->self_pipe)==-1) {
LOG_ERROR(("Can't make a pipe %d",errno));
LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
#endif
free(adaptor_threads);
return -1;
Expand Down Expand Up @@ -365,7 +365,7 @@ void *do_io(void *v)

api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG(("started IO thread"));
LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
fds[0].fd=adaptor_threads->self_pipe[0];
fds[0].events=POLLIN;
while(!zh->close_requested) {
Expand Down Expand Up @@ -400,7 +400,7 @@ void *do_io(void *v)
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG(("started IO thread"));
LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds);
while(!zh->close_requested) {
struct timeval tv;
Expand Down Expand Up @@ -444,7 +444,7 @@ void *do_io(void *v)
break;
}
api_epilog(zh, 0);
LOG_DEBUG(("IO thread terminated"));
LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
return 0;
}

Expand All @@ -457,7 +457,7 @@ void *do_completion(void *v)
zhandle_t *zh = v;
api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG(("started completion thread"));
LOG_DEBUG(LOGCALLBACK(zh), "started completion thread");
while(!zh->close_requested) {
pthread_mutex_lock(&zh->completions_to_process.lock);
while(!zh->completions_to_process.head && !zh->close_requested) {
Expand All @@ -467,7 +467,7 @@ void *do_completion(void *v)
process_completions(zh);
}
api_epilog(zh, 0);
LOG_DEBUG(("completion thread terminated"));
LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated");
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/c/src/zk_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct _zhandle {
clientid_t client_id; // client-id
long long last_zxid; // last zookeeper ID
auth_list_head_t auth_h; // authentication data list
log_callback_fn log_callback; // Callback for logging (falls back to logging to stderr)

// Primer storage
struct _buffer_list primer_buffer; // The buffer used for the handshake at the start of a connection
Expand Down
Loading

0 comments on commit 37973fa

Please sign in to comment.