diff --git a/.codespell/requirements.txt b/.codespell/requirements.txt index 407f17489c6..ddff454685c 100644 --- a/.codespell/requirements.txt +++ b/.codespell/requirements.txt @@ -1 +1 @@ -codespell==2.2.4 +codespell==2.2.5 diff --git a/src/Makefile b/src/Makefile index ecbd2753d9f..68b80a2a7cf 100644 --- a/src/Makefile +++ b/src/Makefile @@ -345,7 +345,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/acl.c b/src/acl.c index 900746ece74..607512beb56 100644 --- a/src/acl.c +++ b/src/acl.c @@ -309,7 +309,7 @@ void *ACLListDupSds(void *item) { /* Structure used for handling key patterns with different key * based permissions. */ typedef struct { - int flags; /* The CMD_KEYS_* flags for this key pattern */ + int flags; /* The ACL key permission types for this key pattern */ sds pattern; /* The pattern to match keys against */ } keyPattern; @@ -963,7 +963,7 @@ void ACLResetFirstArgs(aclSelector *selector) { selector->allowed_firstargs = NULL; } -/* Add a first-arh to the list of subcommands for the user 'u' and +/* Add a first-arg to the list of subcommands for the user 'u' and * the command id specified. */ void ACLAddAllowedFirstArg(aclSelector *selector, unsigned long id, const char *sub) { /* If this is the first first-arg to be configured for @@ -1458,7 +1458,7 @@ void ACLInit(void) { * otherwise C_ERR is returned and errno is set to: * * EINVAL: if the username-password do not match. - * ENONENT: if the specified user does not exist at all. + * ENOENT: if the specified user does not exist at all. */ int ACLCheckUserCredentials(robj *username, robj *password) { user *u = ACLGetUserByName(username->ptr,sdslen(username->ptr)); diff --git a/src/ae.c b/src/ae.c index 1b6422b2db8..ff60630e379 100644 --- a/src/ae.c +++ b/src/ae.c @@ -333,7 +333,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) { processed++; now = getMonotonicUs(); if (retval != AE_NOMORE) { - te->when = now + retval * 1000; + te->when = now + (monotime)retval * 1000; } else { te->id = AE_DELETED_EVENT_ID; } diff --git a/src/atomicvar.h b/src/atomicvar.h index 2c2969c33b9..1cbfd5b9df4 100644 --- a/src/atomicvar.h +++ b/src/atomicvar.h @@ -1,16 +1,41 @@ /* This file implements atomic counters using c11 _Atomic, __atomic or __sync * macros if available, otherwise we will throw an error when compile. * - * The exported interface is composed of three macros: + * The exported interface is composed of the following macros: * * atomicIncr(var,count) -- Increment the atomic counter * atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter + * atomicIncrGet(var,newvalue_var,count) -- Increment and get the atomic counter new value * atomicDecr(var,count) -- Decrement the atomic counter * atomicGet(var,dstvar) -- Fetch the atomic counter value * atomicSet(var,value) -- Set the atomic counter value * atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization * atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization - * + * + * Atomic operations on flags. + * Flag type can be int, long, long long or their unsigned counterparts. + * The value of the flag can be 1 or 0. + * + * atomicFlagGetSet(var,oldvalue_var) -- Get and set the atomic counter value + * + * NOTE1: __atomic* and _Atomic implementations can be actually elaborated to support any value by changing the + * hardcoded new value passed to __atomic_exchange* from 1 to @param count + * i.e oldvalue_var = atomic_exchange_explicit(&var, count). + * However, in order to be compatible with the __sync functions family, we can use only 0 and 1. + * The only exchange alternative suggested by __sync is __sync_lock_test_and_set, + * But as described by the gnu manual for __sync_lock_test_and_set(): + * https://gcc.gnu.org/onlinedocs/gcc/_005f_005fsync-Builtins.html + * "A target may support reduced functionality here by which the only valid value to store is the immediate constant 1. The exact value + * actually stored in *ptr is implementation defined." + * Hence, we can't rely on it for a any value other than 1. + * We eventually chose to implement this method with __sync_val_compare_and_swap since it satisfies functionality needed for atomicFlagGetSet + * (if the flag was 0 -> set to 1, if it's already 1 -> do nothing, but the final result is that the flag is set), + * and also it has a full barrier (__sync_lock_test_and_set has acquire barrier). + * + * NOTE2: Unlike other atomic type, which aren't guaranteed to be lock free, c11 atmoic_flag does. + * To check whether a type is lock free, atomic_is_lock_free() can be used. + * It can be considered to limit the flag type to atomic_flag to improve performance. + * * Never use return value from the macros, instead use the AtomicGetIncr() * if you need to get the current value and increment it atomically, like * in the following example: @@ -93,6 +118,8 @@ #define atomicGetIncr(var,oldvalue_var,count) do { \ oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \ } while(0) +#define atomicIncrGet(var, newvalue_var, count) \ + newvalue_var = atomicIncr(var,count) + count #define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed) #define atomicGet(var,dstvar) do { \ dstvar = atomic_load_explicit(&var,memory_order_relaxed); \ @@ -103,6 +130,8 @@ } while(0) #define atomicSetWithSync(var,value) \ atomic_store_explicit(&var,value,memory_order_seq_cst) +#define atomicFlagGetSet(var,oldvalue_var) \ + oldvalue_var = atomic_exchange_explicit(&var,1,memory_order_relaxed) #define REDIS_ATOMIC_API "c11-builtin" #elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && \ @@ -111,6 +140,8 @@ /* Implementation using __atomic macros. */ #define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) +#define atomicIncrGet(var, newvalue_var, count) \ + newvalue_var = __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) #define atomicGetIncr(var,oldvalue_var,count) do { \ oldvalue_var = __atomic_fetch_add(&var,(count),__ATOMIC_RELAXED); \ } while(0) @@ -124,12 +155,16 @@ } while(0) #define atomicSetWithSync(var,value) \ __atomic_store_n(&var,value,__ATOMIC_SEQ_CST) +#define atomicFlagGetSet(var,oldvalue_var) \ + oldvalue_var = __atomic_exchange_n(&var,1,__ATOMIC_RELAXED) #define REDIS_ATOMIC_API "atomic-builtin" #elif defined(HAVE_ATOMIC) /* Implementation using __sync macros. */ #define atomicIncr(var,count) __sync_add_and_fetch(&var,(count)) +#define atomicIncrGet(var, newvalue_var, count) \ + newvalue_var = __sync_add_and_fetch(&var,(count)) #define atomicGetIncr(var,oldvalue_var,count) do { \ oldvalue_var = __sync_fetch_and_add(&var,(count)); \ } while(0) @@ -149,6 +184,8 @@ ANNOTATE_HAPPENS_BEFORE(&var); \ while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \ } while(0) +#define atomicFlagGetSet(var,oldvalue_var) \ + oldvalue_var = __sync_val_compare_and_swap(&var,0,1) #define REDIS_ATOMIC_API "sync-builtin" #else diff --git a/src/cluster.c b/src/cluster.c index c6d7e484ed0..9366c32d1c8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -119,6 +119,20 @@ static inline int defaultClientPort(void) { return server.tls_cluster ? server.tls_port : server.port; } +/* When a cluster command is called, we need to decide whether to return TLS info or + * non-TLS info by the client's connection type. However if the command is called by + * a Lua script or RM_call, there is no connection in the fake client, so we use + * server.current_client here to get the real client if available. And if it is not + * available (modules may call commands without a real client), we return the default + * info, which is determined by server.tls_cluster. */ +static int shouldReturnTlsInfo(void) { + if (server.current_client && server.current_client->conn) { + return connIsTLS(server.current_client->conn); + } else { + return server.tls_cluster; + } +} + /* Links to the next and previous entries for keys in the same slot are stored * in the dict entry metadata. See Slot to Key API below. */ #define dictEntryNextInSlot(de) \ @@ -2394,7 +2408,6 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc } clusterDelSlot(j); clusterAddSlot(sender,j); - bitmapClearBit(server.cluster->owner_not_claiming_slot, j); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_FSYNC_CONFIG); @@ -4930,6 +4943,8 @@ int clusterDelSlot(int slot) { } serverAssert(clusterNodeClearSlotBit(n,slot) == 1); server.cluster->slots[slot] = NULL; + /* Make owner_not_claiming_slot flag consistent with slot ownership information. */ + bitmapClearBit(server.cluster->owner_not_claiming_slot, slot); return C_OK; } @@ -5570,7 +5585,7 @@ void addNodeToNodeReply(client *c, clusterNode *node) { } /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - addReplyLongLong(c, getNodeClientPort(node, connIsTLS(c->conn))); + addReplyLongLong(c, getNodeClientPort(node, shouldReturnTlsInfo())); addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); /* Add the additional endpoint information, this is all the known networking information @@ -5703,7 +5718,7 @@ void addShardReplyForClusterShards(client *c, list *nodes) { serverAssert((n->slot_info_pairs_count % 2) == 0); addReplyArrayLen(c, n->slot_info_pairs_count); for (int i = 0; i < n->slot_info_pairs_count; i++) - addReplyBulkLongLong(c, (unsigned long)n->slot_info_pairs[i]); + addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]); } else { /* If no slot info pair is provided, the node owns no slots */ addReplyArrayLen(c, 0); @@ -5946,7 +5961,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { /* CLUSTER NODES */ /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - sds nodes = clusterGenNodesDescription(c, 0, connIsTLS(c->conn)); + sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo()); addReplyVerbatim(c,nodes,sdslen(nodes),"txt"); sdsfree(nodes); } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { @@ -6312,7 +6327,7 @@ NULL /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ addReplyArrayLen(c,n->numslaves); for (j = 0; j < n->numslaves; j++) { - sds ni = clusterGenNodeDescription(c, n->slaves[j], connIsTLS(c->conn)); + sds ni = clusterGenNodeDescription(c, n->slaves[j], shouldReturnTlsInfo()); addReplyBulkCString(c,ni); sdsfree(ni); } @@ -7438,7 +7453,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co error_code == CLUSTER_REDIR_ASK) { /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - int port = getNodeClientPort(n, connIsTLS(c->conn)); + int port = getNodeClientPort(n, shouldReturnTlsInfo()); addReplyErrorSds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", diff --git a/src/commands/cluster-shards.json b/src/commands/cluster-shards.json index dcaad3ea3eb..e7a08295347 100644 --- a/src/commands/cluster-shards.json +++ b/src/commands/cluster-shards.json @@ -26,7 +26,7 @@ "description": "an even number element array specifying the start and end slot numbers for slot ranges owned by this shard", "type": "array", "items": { - "type": "string" + "type": "integer" } }, "nodes": { diff --git a/src/db.c b/src/db.c index a7cb4dbd371..a913044abcb 100644 --- a/src/db.c +++ b/src/db.c @@ -188,8 +188,8 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { /* Add the key to the DB. It's up to the caller to increment the reference * counter of the value if needed. * - * If the update_if_existing argument is false, the the program is aborted - * if the key already exists, otherwise, it can fall back to dbOverwite. */ + * If the update_if_existing argument is false, the program is aborted + * if the key already exists, otherwise, it can fall back to dbOverwrite. */ static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_existing) { dictEntry *existing; dictEntry *de = dictAddRaw(db->dict, key->ptr, &existing); diff --git a/src/debug.c b/src/debug.c index 809822f96a7..540768018ea 100644 --- a/src/debug.c +++ b/src/debug.c @@ -36,6 +36,7 @@ #include "quicklist.h" #include "fpconv_dtoa.h" #include "cluster.h" +#include "threads_mngr.h" #include #include @@ -75,6 +76,7 @@ void bugReportStart(void); void printCrashReport(void); void bugReportEnd(int killViaSignal, int sig); void logStackTrace(void *eip, int uplevel); +void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret); /* ================================= Debugging ============================== */ @@ -1819,6 +1821,102 @@ void closeDirectLogFiledes(int fd) { } #ifdef HAVE_BACKTRACE +#define BACKTRACE_MAX_SIZE 100 +#ifdef __linux__ +#include +#include +#include + +static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output); + +#define MAX_BUFF_LENGTH 256 +typedef struct { + char thread_name[16]; + int trace_size; + pid_t tid; + void *trace[BACKTRACE_MAX_SIZE]; +} stacktrace_data; + +static void *collect_stacktrace_data(void) { + /* allocate stacktrace_data struct */ + stacktrace_data *trace_data = zmalloc(sizeof(stacktrace_data)); + + /* Get the stack trace first! */ + trace_data->trace_size = backtrace(trace_data->trace, BACKTRACE_MAX_SIZE); + + /* get the thread name */ + prctl(PR_GET_NAME, trace_data->thread_name); + + /* get the thread id */ + trace_data->tid = syscall(SYS_gettid); + + /* return the trace data */ + return trace_data; +} + +static void writeStacktraces(int fd, int uplevel) { + /* get the list of all the process's threads that don't block or ignore the THREADS_SIGNAL */ + pid_t pid = getpid(); + size_t len_tids = 0; + pid_t *tids = get_ready_to_signal_threads_tids(pid, THREADS_SIGNAL, &len_tids); + + /* This call returns either NULL or the stacktraces data from all tids */ + stacktrace_data **stacktraces_data = (stacktrace_data **)ThreadsManager_runOnThreads(tids, len_tids, collect_stacktrace_data); + + /* free tids */ + zfree(tids); + + /* ThreadsManager_runOnThreads returns NULL if it is already running */ + if (!stacktraces_data) return; + + + char buff[MAX_BUFF_LENGTH]; + pid_t calling_tid = syscall(SYS_gettid); + /* for backtrace_data in backtraces_data: */ + for (size_t i = 0; i < len_tids; i++) { + stacktrace_data *curr_stacktrace_data = stacktraces_data[i]; + /*ThreadsManager_runOnThreads might fail to collect the thread's data */ + if (!curr_stacktrace_data) continue; + + /* stacktrace header includes the tid and the thread's name */ + snprintf(buff, MAX_BUFF_LENGTH, "\n%d %s", curr_stacktrace_data->tid, curr_stacktrace_data->thread_name); + if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */}; + + /* skip kernel call to the signal handler, the signal handler and the callback addresses */ + int curr_uplevel = 3; + + if (curr_stacktrace_data->tid == calling_tid) { + /* skip signal syscall and ThreadsManager_runOnThreads */ + curr_uplevel += uplevel + 2; + /* Add an indication to header of the thread that is handling the log file */ + snprintf(buff, MAX_BUFF_LENGTH, " *\n"); + } else { + /* just add a new line */ + snprintf(buff, MAX_BUFF_LENGTH, "\n"); + } + + if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */}; + + /* add the stacktrace */ + backtrace_symbols_fd(curr_stacktrace_data->trace+curr_uplevel, curr_stacktrace_data->trace_size-curr_uplevel, fd); + + zfree(curr_stacktrace_data); + } + zfree(stacktraces_data); +} + +#else /* __linux__*/ + +static void writeStacktraces(int fd, int uplevel) { + void *trace[BACKTRACE_MAX_SIZE]; + + int trace_size = backtrace(trace, BACKTRACE_MAX_SIZE); + + char *msg = "\nBacktrace:\n"; + if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */}; + backtrace_symbols_fd(trace+uplevel, trace_size-uplevel, fd); +} +#endif /* __linux__ */ /* Logs the stack trace using the backtrace() call. This function is designed * to be called from signal handlers safely. @@ -1826,16 +1924,12 @@ void closeDirectLogFiledes(int fd) { * The uplevel argument indicates how many of the calling functions to skip. */ void logStackTrace(void *eip, int uplevel) { - void *trace[100]; - int trace_size = 0, fd = openDirectLogFiledes(); + int fd = openDirectLogFiledes(); char *msg; uplevel++; /* skip this function */ if (fd == -1) return; /* If we can't log there is anything to do. */ - /* Get the stack trace first! */ - trace_size = backtrace(trace, 100); - msg = "\n------ STACK TRACE ------\n"; if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */}; @@ -1847,9 +1941,8 @@ void logStackTrace(void *eip, int uplevel) { } /* Write symbols to log file */ - msg = "\nBacktrace:\n"; - if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */}; - backtrace_symbols_fd(trace+uplevel, trace_size-uplevel, fd); + ++uplevel; + writeStacktraces(fd, uplevel); /* Cleanup */ closeDirectLogFiledes(fd); @@ -2184,6 +2277,17 @@ static void sigsegvHandler(int sig, siginfo_t *info, void *secret) { bugReportEnd(1, sig); } +void setupDebugSigHandlers(void) { + setupSigSegvHandler(); + + struct sigaction act; + + sigemptyset(&act.sa_mask); + act.sa_flags = SA_SIGINFO; + act.sa_sigaction = sigalrmSignalHandler; + sigaction(SIGALRM, &act, NULL); +} + void setupSigSegvHandler(void) { /* Initialize the signal handler lock. Attempting to initialize an already initialized mutex or mutexattr results in undefined behavior. */ @@ -2260,7 +2364,7 @@ void bugReportEnd(int killViaSignal, int sig) { if (server.daemonize && server.supervised == 0 && server.pidfile) unlink(server.pidfile); if (!killViaSignal) { - /* To avoid issues with valgrind, we may wanna exit rahter than generate a signal */ + /* To avoid issues with valgrind, we may wanna exit rather than generate a signal */ if (server.use_exit_on_panic) { /* Using _exit to bypass false leak reports by gcc ASAN */ fflush(stdout); @@ -2305,16 +2409,21 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len) { /* =========================== Software Watchdog ============================ */ #include -void watchdogSignalHandler(int sig, siginfo_t *info, void *secret) { +void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret) { #ifdef HAVE_BACKTRACE ucontext_t *uc = (ucontext_t*) secret; #else (void)secret; #endif - UNUSED(info); UNUSED(sig); - serverLogFromHandler(LL_WARNING,"\n--- WATCHDOG TIMER EXPIRED ---"); + /* SIGALRM can be sent explicitly to the process calling kill() to get the stacktraces, + or every watchdog_period interval. In the last case, si_pid is not set */ + if(info->si_pid == 0) { + serverLogFromHandler(LL_WARNING,"\n--- WATCHDOG TIMER EXPIRED ---"); + } else { + serverLogFromHandler(LL_WARNING, "\nReceived SIGALRM"); + } #ifdef HAVE_BACKTRACE logStackTrace(getAndSetMcontextEip(uc, NULL), 1); #else @@ -2338,25 +2447,10 @@ void watchdogScheduleSignal(int period) { setitimer(ITIMER_REAL, &it, NULL); } void applyWatchdogPeriod(void) { - struct sigaction act; - /* Disable watchdog when period is 0 */ if (server.watchdog_period == 0) { watchdogScheduleSignal(0); /* Stop the current timer. */ - - /* Set the signal handler to SIG_IGN, this will also remove pending - * signals from the queue. */ - sigemptyset(&act.sa_mask); - act.sa_flags = 0; - act.sa_handler = SIG_IGN; - sigaction(SIGALRM, &act, NULL); } else { - /* Setup the signal handler. */ - sigemptyset(&act.sa_mask); - act.sa_flags = SA_SIGINFO; - act.sa_sigaction = watchdogSignalHandler; - sigaction(SIGALRM, &act, NULL); - /* If the configured period is smaller than twice the timer period, it is * too short for the software watchdog to work reliably. Fix it now * if needed. */ @@ -2374,3 +2468,114 @@ void debugDelay(int usec) { if (usec < 0) usec = (rand() % -usec) == 0 ? 1: 0; if (usec) usleep(usec); } + +#ifdef HAVE_BACKTRACE +#ifdef __linux__ + +/* =========================== Stacktrace Utils ============================ */ +#define TIDS_INITIAL_SIZE 50 + +/** If it doesn't block and doesn't ignore, return 1 (the thread will handle the signal) + * If thread tid blocks or ignores sig_num returns 0 (thread is not ready to catch the signal). + * also returns 0 if something is wrong and prints a warning message to the log file **/ +static int is_thread_ready_to_signal(pid_t pid, pid_t tid, int sig_num) { + /* open the threads status file */ + char buff[MAX_BUFF_LENGTH]; + snprintf(buff, MAX_BUFF_LENGTH, "/proc/%d/task/%d/status", pid, tid); + FILE *thread_status_file = fopen(buff, "r"); + if (thread_status_file == NULL) { + serverLog(LL_WARNING, + "tid:%d: failed to open /proc/%d/task/%d/status file", tid, pid, tid); + return 0; + } + + int ret = 1; + size_t field_name_len = strlen("SigBlk:"); /* SigIgn has the same length */ + char *line = NULL; + size_t fields_count = 2; + while ((line = fgets(buff, MAX_BUFF_LENGTH, thread_status_file)) && fields_count) { + /* iterate the file until we reach SigBlk or SigIgn field line */ + if (!strncmp(buff, "SigBlk:", field_name_len) || !strncmp(buff, "SigIgn:", field_name_len)) { + /* check if the signal exist in the mask */ + unsigned long sig_mask = strtoul(buff + field_name_len, NULL, 16); + if(sig_mask & sig_num) { /* if the signal is blocked/ignored return 0 */ + ret = 0; + break; + } + --fields_count; + } + } + + fclose(thread_status_file); + + /* if we reached EOF, it means we haven't found SigBlk or/and SigIgn, something is wrong */ + if (line == NULL) { + ret = 0; + serverLog(LL_WARNING, + "tid:%d: failed to find SigBlk or/and SigIgn field(s) in /proc/%d/task/%d/status file", tid, pid, tid); + } + return ret; +} + +/** Returns a list of all the process's (pid) threads that can receive signal sig_num. + * Also updates tids_len_output to the number of valid threads' ids in the returned array + * NOTE: It is the caller responsibility to free the returned array with zfree(). */ +static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output) { + /* Initialize the path the process threads' directory. */ + char path_buff[MAX_BUFF_LENGTH]; + snprintf(path_buff, MAX_BUFF_LENGTH, "/proc/%d/task", pid); + + /* Get the directory handler. */ + DIR *dir; + if (!(dir = opendir(path_buff))) return NULL; + + size_t tids_cap = TIDS_INITIAL_SIZE; + pid_t *tids = zmalloc(sizeof(pid_t) * tids_cap); + + size_t tids_count = 0; + struct dirent *entry; + pid_t calling_tid = syscall(SYS_gettid); + int current_thread_index = -1; + + /* Each thread is represented by a directory */ + while ((entry = readdir(dir)) != NULL) { + if (entry->d_type == DT_DIR) { + /* Skip irrelevant directories. */ + if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) { + /* the thread's directory name is equivalent to its tid. */ + pid_t tid = atoi(entry->d_name); + + if(!is_thread_ready_to_signal(pid, tid, sig_num)) continue; + + if(tid == calling_tid) { + current_thread_index = tids_count; + } + + /* increase tids capacity if needed */ + if(tids_count >= tids_cap) { + tids_cap *= 2; + tids = zrealloc(tids, sizeof(pid_t) * tids_cap); + } + + /* save the thread id */ + tids[tids_count++] = tid; + } + } + } + + /* Swap the last tid with the the current thread id */ + if(current_thread_index != -1) { + pid_t last_tid = tids[tids_count - 1]; + + tids[tids_count - 1] = calling_tid; + tids[current_thread_index] = last_tid; + } + + + closedir(dir); + + *tids_len_output = tids_count; + return tids; +} +#endif /* __linux__ */ +#endif /* HAVE_BACKTRACE */ diff --git a/src/sentinel.c b/src/sentinel.c index 238be905f6d..cce5cc3b995 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -4129,7 +4129,7 @@ NULL else if (!strcasecmp(c->argv[2]->ptr,"get") && c->argc >= 4) sentinelConfigGetCommand(c); else - addReplyError(c, "Only SENTINEL CONFIG GET [ ...]/ SET [ ...] are supported."); + addReplyError(c, "Only SENTINEL CONFIG GET [ ...] / SET [ ...] are supported."); } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) { /* SENTINEL INFO-CACHE */ if (c->argc < 2) goto numargserr; diff --git a/src/server.c b/src/server.c index 929f9b7cdef..6b62056d3df 100644 --- a/src/server.c +++ b/src/server.c @@ -38,6 +38,7 @@ #include "functions.h" #include "hdr_histogram.h" #include "syscheck.h" +#include "threads_mngr.h" #include #include @@ -2563,6 +2564,7 @@ void initServer(void) { signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); + ThreadsManager_init(); makeThreadKillable(); if (server.syslog_enabled) { @@ -6526,7 +6528,7 @@ void setupSignalHandlers(void) { sigaction(SIGTERM, &act, NULL); sigaction(SIGINT, &act, NULL); - setupSigSegvHandler(); + setupDebugSigHandlers(); } /* This is the signal handler for children process. It is currently useful diff --git a/src/server.h b/src/server.h index 52a2f858488..ac324eb6cea 100644 --- a/src/server.h +++ b/src/server.h @@ -2003,7 +2003,7 @@ struct redisServer { /* Scripting */ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */ int pre_command_oom_state; /* OOM before command (script?) was started */ - int script_disable_deny_script; /* Allow running commands marked "no-script" inside a script. */ + int script_disable_deny_script; /* Allow running commands marked "noscript" inside a script. */ /* Lazy free */ int lazyfree_lazy_eviction; int lazyfree_lazy_expire; @@ -3707,6 +3707,7 @@ void _serverPanic(const char *file, int line, const char *msg, ...) void _serverPanic(const char *file, int line, const char *msg, ...); #endif void serverLogObjectDebugInfo(const robj *o); +void setupDebugSigHandlers(void); void setupSigSegvHandler(void); void removeSigSegvHandlers(void); const char *getSafeInfoString(const char *s, size_t len, char **tmp); diff --git a/src/threads_mngr.c b/src/threads_mngr.c new file mode 100644 index 00000000000..cb93dcf336a --- /dev/null +++ b/src/threads_mngr.c @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "threads_mngr.h" +/* Anti-warning macro... */ +#define UNUSED(V) ((void) V) + +#ifdef __linux__ +#include "zmalloc.h" +#include "atomicvar.h" +#include "server.h" + +#include +#include +#include +#include +#include + +#define IN_PROGRESS 1 +static const clock_t RUN_ON_THREADS_TIMEOUT = 2; + +/*================================= Globals ================================= */ + +static run_on_thread_cb g_callback = NULL; +static volatile size_t g_tids_len = 0; +static void **g_output_array = NULL; +static redisAtomic size_t g_thread_ids = 0; +static redisAtomic size_t g_num_threads_done = 0; + +static sem_t wait_for_threads_sem; + +/* This flag is set while ThreadsManager_runOnThreads is running */ +static redisAtomic int g_in_progress = 0; + +/*============================ Internal prototypes ========================== */ + +static void invoke_callback(int sig); +/* returns 0 if it is safe to start, IN_PROGRESS otherwise. */ +static int test_and_start(void); +static void wait_threads(void); +/* Clean up global variable. +Assuming we are under the g_in_progress protection, this is not a thread-safe function */ +static void ThreadsManager_cleanups(void); + +/*============================ API functions implementations ========================== */ + +void ThreadsManager_init(void) { + /* Register signal handler */ + struct sigaction act; + sigemptyset(&act.sa_mask); + /* Not setting SA_RESTART flag means that If a signal handler is invoked while a + system call or library function call is blocked, use the default behavior + i.e., the call fails with the error EINTR */ + act.sa_flags = 0; + act.sa_handler = invoke_callback; + sigaction(SIGUSR2, &act, NULL); +} + +void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { + /* Check if it is safe to start running. If not - return */ + if(test_and_start() == IN_PROGRESS) { + return NULL; + } + + /* Update g_callback */ + g_callback = callback; + + /* Set g_tids_len */ + g_tids_len = tids_len; + + /* Allocate the output buffer */ + g_output_array = zmalloc(sizeof(void*) * tids_len); + + /* Initialize a semaphore that we will be waiting on for the threads + use pshared = 0 to indicate the semaphore is shared between the process's threads (and not between processes), + and value = 0 as the initial semaphore value. */ + sem_init(&wait_for_threads_sem, 0, 0); + + /* Send signal to all the threads in tids */ + pid_t pid = getpid(); + for (size_t i = 0; i < tids_len ; ++i) { + syscall(SYS_tgkill, pid, tids[i], THREADS_SIGNAL); + } + + /* Wait for all the threads to write to the output array, or until timeout is reached */ + wait_threads(); + + void **ret = g_output_array; + + /* Cleanups to allow next execution */ + ThreadsManager_cleanups(); + + return ret; +} + +/*============================ Internal functions implementations ========================== */ + + +static int test_and_start(void) { + /* atomicFlagGetSet sets the variable to 1 and returns the previous value */ + int prev_state; + atomicFlagGetSet(g_in_progress, prev_state); + + /* If prev_state is 1, g_in_progress was on. */ + return prev_state; +} + +static void invoke_callback(int sig) { + UNUSED(sig); + + size_t thread_id; + atomicGetIncr(g_thread_ids, thread_id, 1); + g_output_array[thread_id] = g_callback(); + size_t curr_done_count; + atomicIncrGet(g_num_threads_done, curr_done_count, 1); + + /* last thread shuts down the light */ + if (curr_done_count == g_tids_len) { + sem_post(&wait_for_threads_sem); + } +} + +static void wait_threads(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + + /* calculate relative time until timeout */ + ts.tv_sec += RUN_ON_THREADS_TIMEOUT; + + int status = 0; + + /* lock the semaphore until the semaphore value rises above zero or a signal + handler interrupts the call. In the later case continue to wait. */ + while ((status = sem_timedwait(&wait_for_threads_sem, &ts)) == -1 && errno == EINTR) { + serverLog(LL_WARNING, "threads_mngr: waiting for threads' output was interrupted by signal. Continue waiting."); + continue; + } + + if (status == -1) { + if (errno == ETIMEDOUT) { + serverLog(LL_WARNING, "threads_mngr: waiting for threads' output timed out"); + } + } +} + +static void ThreadsManager_cleanups(void) { + g_callback = NULL; + g_tids_len = 0; + g_output_array = NULL; + g_thread_ids = 0; + g_num_threads_done = 0; + sem_destroy(&wait_for_threads_sem); + + /* Lastly, turn off g_in_progress */ + atomicSet(g_in_progress, 0); +} +#else + +void ThreadsManager_init(void) { + /* DO NOTHING */ +} + +void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { + /* DO NOTHING */ + UNUSED(tids); + UNUSED(tids_len); + UNUSED(callback); + return NULL; +} + +#endif /* __linux__ */ diff --git a/src/threads_mngr.h b/src/threads_mngr.h new file mode 100644 index 00000000000..2dc82ca0ec3 --- /dev/null +++ b/src/threads_mngr.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#pragma once + +#include "fmacros.h" + +#include +#include + +/** This is an API to invoke callback on a list of threads using a user defined signal handler. + * NOTE: This is API is only supported only in linux systems. + * Calling the functions below on any other system does nothing. +*/ + +#define THREADS_SIGNAL SIGUSR2 + +/* Callback signature */ +typedef void*(*run_on_thread_cb)(void); + +/* Register the process to THREADS_SIGNAL */ +void ThreadsManager_init(void); + +/** @brief Invoke callback by each thread in tids. + * + * @param tids An array of threads that need to invoke callback. + * @param tids_len The number of threads in @param tids. + * @param callback A callback to be invoked by each thread in @param tids. + * + * NOTES: + * It is assumed that all the threads don't block or ignore THREADS_SIGNAL. + * + * It is safe to include the calling thread in @param tids. However, be aware that subsequent tids will + * not be signaled until the calling thread returns from the callback invocation. + * Hence, it is recommended to place the calling thread last in @param tids. + * + * The function returns only when @param tids_len threads have returned from @param callback. + * + * @return NULL If ThreadsManager_runOnThreads is already in the middle of execution. + * Otherwise, it returns an array of the threads return value from @param callback. + * NOTES: + * The indices of the outputs in the output array are NOT associated with the threads indices in @param tids. + * + * The returned array length will be @param tids_len, but some of the entries might be set to NULL if the + * invocation of @param callback was unsuccessful. + * + * The output array should be freed by the caller by calling zfree(). +**/ + +void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback); diff --git a/tests/integration/corrupt-dump-fuzzer.tcl b/tests/integration/corrupt-dump-fuzzer.tcl index 9cd4ff913ad..45705745ee6 100644 --- a/tests/integration/corrupt-dump-fuzzer.tcl +++ b/tests/integration/corrupt-dump-fuzzer.tcl @@ -169,6 +169,11 @@ foreach sanitize_dump {no yes} { incr stat_terminated_by_signal $by_signal if {$by_signal != 0 || $sanitize_dump == yes} { + if {$::dump_logs} { + set srv [get_srv 0] + dump_server_log $srv + } + puts "Server crashed (by signal: $by_signal), with payload: $printable_dump" set print_commands true } diff --git a/tests/integration/logging.tcl b/tests/integration/logging.tcl index 4f8639be0de..7bd2eb83c25 100644 --- a/tests/integration/logging.tcl +++ b/tests/integration/logging.tcl @@ -2,11 +2,13 @@ tags {"external:skip"} { set system_name [string tolower [exec uname -s]] set backtrace_supported 0 +set threads_mngr_supported 0 # We only support darwin or Linux with glibc if {$system_name eq {darwin}} { set backtrace_supported 1 } elseif {$system_name eq {linux}} { + set threads_mngr_supported 1 # Avoid the test on libmusl, which does not support backtrace # and on static binaries (ldd exit code 1) where we can't detect libmusl catch { @@ -23,6 +25,17 @@ if {$backtrace_supported} { test "Server is able to generate a stack trace on selected systems" { r config set watchdog-period 200 r debug sleep 1 + if {$threads_mngr_supported} { + assert_equal [count_log_message 0 "failed to open /proc/"] 0 + assert_equal [count_log_message 0 "failed to find SigBlk or/and SigIgn"] 0 + assert_equal [count_log_message 0 "threads_mngr: waiting for threads' output was interrupted by signal"] 0 + assert_equal [count_log_message 0 "threads_mngr: waiting for threads' output timed out"] 0 + assert_equal [count_log_message 0 "bioProcessBackgroundJobs"] 3 + } + + set pattern "*redis-server *main*" + set res [wait_for_log_messages 0 \"$pattern\" 0 100 100] + if {$::verbose} { puts $res } set pattern "*debugCommand*" set res [wait_for_log_messages 0 \"$pattern\" 0 100 100] if {$::verbose} { puts $res } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 21fa35d4d5b..6c3714e9ad0 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -197,6 +197,12 @@ proc srv {args} { dict get $srv $property } +# Take an index to get a srv. +proc get_srv {level} { + set srv [lindex $::servers end+$level] + return $srv +} + # Provide easy access to the client for the inner server. It's possible to # prepend the argument list with a negative level to access clients for # servers running in outer blocks. diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index fc0ef61e29c..15dae20e4c3 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -350,7 +350,7 @@ start_server {tags {"expire"}} { r restore foo17 100000 $encoded r restore foo18 [expr [clock milliseconds]+100000] $encoded absttl - # Assert that each TTL-relatd command are persisted with absolute timestamps in AOF + # Assert that each TTL-related command are persisted with absolute timestamps in AOF assert_aof_content $aof { {select *} {set foo1 bar PXAT *} diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index d821c7adc8b..564f1e4b2a4 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -493,7 +493,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r set expected_frag 1.3 r debug mallctl-str thread.tcache.flush VOID - # fill the first slab containin 32 regs of 640 bytes. + # fill the first slab containing 32 regs of 640 bytes. for {set j 0} {$j < 32} {incr j} { r setrange "_$j" 600 x r debug mallctl-str thread.tcache.flush VOID diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index bea8508b1e4..666b5930e43 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -876,7 +876,7 @@ start_server {tags {"tracking network logreqres:skip"}} { $rd close } -# Just some extra covergae for --log-req-res, because we do not +# Just some extra coverage for --log-req-res, because we do not # run the full tracking unit in that mode start_server {tags {"tracking network"}} { test {Coverage: Basic CLIENT CACHING} { diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 993b6d13529..586d3d30865 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -1120,7 +1120,7 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { assert_equal {} [$rd read] $rd deferred 0 # We want to force key deletion to be propagated to the replica - # in order to verify it was expiered on the replication stream. + # in order to verify it was expired on the replication stream. $rd set somekey1 someval1 $rd exists k r set somekey2 someval2 @@ -1168,7 +1168,7 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { r client unblock $id assert_equal {} [$rd read] # We want to force key deletion to be propagated to the replica - # in order to verify it was expiered on the replication stream. + # in order to verify it was expired on the replication stream. $rd exists k assert_equal {0} [$rd read] assert_replication_stream $repl {