diff --git a/src/istgt.c b/src/istgt.c index adff426d..b8a70964 100644 --- a/src/istgt.c +++ b/src/istgt.c @@ -2352,8 +2352,8 @@ fatal_handler(int sig) void *array[20]; size_t size; - fprintf(stderr, "Fatal signal received: %d\n", sig); - fprintf(stderr, "Stack trace:\n"); + ISTGT_ERRLOG("Fatal signal received: %d\n", sig); + ISTGT_ERRLOG("Stack trace:\n"); size = backtrace(array, 20); backtrace_symbols_fd(array, size, STDERR_FILENO); diff --git a/src/istgt_lu.h b/src/istgt_lu.h index a3bfb4fa..047be542 100644 --- a/src/istgt_lu.h +++ b/src/istgt_lu.h @@ -904,6 +904,10 @@ typedef struct istgt_lu_disk_t { pthread_mutex_t luworker_rmutex[ISTGT_MAX_NUM_LUWORKERS]; pthread_cond_t luworker_rcond[ISTGT_MAX_NUM_LUWORKERS]; + + // cleanup thread for spec mempool entries + pthread_t deadlist_cleanup_thread; + /* stats */ struct { uint64_t used; diff --git a/src/replication.c b/src/replication.c index f3a1df32..d7c479b4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -54,6 +54,8 @@ int replication_initialized = 0; size_t rcmd_mempool_count = RCMD_MEMPOOL_ENTRIES; struct timespec istgt_start_time; +static void destroy_rcommon_deadlist(spec_t *spec); +static void destroy_resp_list(rcommon_cmd_t *rcomm_cmd, int copies_sent); static int start_rebuild(void *buf, replica_t *replica, uint64_t data_len); static void handle_mgmt_conn_error(replica_t *r, int sfd, struct epoll_event *events, int ev_count); @@ -4352,6 +4354,18 @@ initialize_replication() void destroy_volume(spec_t *spec) { + int ret = 0; + void *res; + + pthread_cancel(spec->deadlist_cleanup_thread); + ret = pthread_join(spec->deadlist_cleanup_thread, &res); + if (ret != 0 || res != PTHREAD_CANCELED) { + REPLICA_NOTICELOG("pthread_join returned ret:%d res:%p for mempool cleanup thread\n", ret, res); + abort(); + } + + destroy_rcommon_deadlist(spec); + ASSERT0(get_num_entries_from_mempool(&spec->rcommon_deadlist)); destroy_mempool(&spec->rcommon_deadlist); @@ -4369,11 +4383,34 @@ destroy_volume(spec_t *spec) return; } +static void +destroy_rcommon_deadlist(spec_t *spec) +{ + int mempool_stale_entry, i; + rcommon_cmd_t *rcomm_cmd; + + mempool_stale_entry = get_num_entries_from_mempool(&spec->rcommon_deadlist); + REPLICA_NOTICELOG("Cleaning up rcommon entry:%d\n", mempool_stale_entry) + + while (mempool_stale_entry) { + rcomm_cmd = get_from_mempool(&spec->rcommon_deadlist); + + destroy_resp_list(rcomm_cmd, rcomm_cmd->copies_sent + rcomm_cmd->non_quorum_copies_sent); + for (i = 1; i < rcomm_cmd->iovcnt + 1; i++) + xfree(rcomm_cmd->iov[i].iov_base); + + free(rcomm_cmd); + + mempool_stale_entry--; + } + + return; +} + int initialize_volume(spec_t *spec, int replication_factor, int consistency_factor, int desired_replication_factor) { int rc; - pthread_t deadlist_cleanup_thread; spec->io_seq = 0; TAILQ_INIT(&spec->rcommon_waitq); @@ -4411,7 +4448,7 @@ initialize_volume(spec_t *spec, int replication_factor, int consistency_factor, return -1; } - rc = pthread_create(&deadlist_cleanup_thread, NULL, &cleanup_deadlist, + rc = pthread_create(&spec->deadlist_cleanup_thread, NULL, &cleanup_deadlist, (void *)spec); if (rc != 0) { REPLICA_ERRLOG("pthread_create(replicator_thread) failed "