Skip to content

Commit

Permalink
[FIXED] cleaning up sanitize=thread found several races (#771)
Browse files Browse the repository at this point in the history
* [TEST+CI only] cleanup of sanitizeThread

* Removed repeat 10

* use TSAN_OPTIONS

* PR feedback: use natsSock_Shutdown in natsConnection_Reconnect

* PR feedback

* disabled sanitize with gcc on travis again

* Fix data race in _resendSubscriptions: add a delivery worker lock

* removed a blank line

* removed TSAN extra verbosity

* fixed another race, in JetStreamSubscribeIdleHeartbeat

* experiment with double-coverage

* Fixed test_MicroAsyncErrorHandler_MaxPendingMsgs
  • Loading branch information
levb authored Jul 22, 2024
1 parent a489234 commit bd700a9
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 23 deletions.
21 changes: 16 additions & 5 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ on:
coverage:
type: string
default: "OFF"
dev_mode:
type: string
default: "OFF"
lib_msg_delivery:
type: string
default: "OFF"
Expand Down Expand Up @@ -40,6 +43,12 @@ on:
type: string
description: "Ubuntu version to use, e.g. '20.04'"
default: "latest"
verbose_test_output:
type: string
default: "OFF"
verbose_make_output:
type: string
default: "ON"
secrets:
CODECOV_TOKEN:
description: "Codecov repo token"
Expand Down Expand Up @@ -83,6 +92,12 @@ jobs:
if [[ "${{ inputs.coverage }}" == "ON" ]]; then
flags="$flags -DNATS_COVERAGE=ON"
fi
if [[ "${{ inputs.dev_mode }}" == "ON" ]]; then
flags="$flags -DDEV_MODE=ON"
fi
if [[ "${{ inputs.verbose_make_output }}" == "ON" ]]; then
flags="$flags -DCMAKE_VERBOSE_MAKEFILE=ON"
fi
echo "flags=$flags" >> $GITHUB_OUTPUT
- id: nats-vars
Expand Down Expand Up @@ -158,11 +173,7 @@ jobs:
export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH
export NATS_TEST_SERVER_VERSION="$(nats-server -v)"
flags=""
ctest --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} 2>&1 | tee /tmp/out.txt
if [[ $(grep -q 'ThreadSanitizer: ' /tmp/out.txt; echo $?) == 0 ]]; then
echo "!!! ThreadSanitizer detected WARNING(s) !!!"
exit 1
fi
ctest --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }}
- name: Upload coverage reports to Codecov
# PRs from external contributors fail: https://github.com/codecov/feedback/issues/301
Expand Down
43 changes: 29 additions & 14 deletions .github/workflows/on-pr-debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,51 @@ jobs:
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

sanitize-addr:
name: "Sanitize address"
coverage-pooled:
name: "Coverage"
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
coverage: ON
server_version: main
type: Debug
lib_msg_delivery: ON
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

sanitize-addr-lib-msg-delivery:
name: "Sanitize address (lib_msg_delivery)"
dev-mode:
name: "DEV_MODE"
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
dev_mode: ON
server_version: main
lib_msg_delivery: ON
type: Debug
verbose_test_output: ON
verbose_make_output: ON

san-addr:
name: "Sanitize address (lib_write_deadline)"
sanitize:
name: "Sanitize"
strategy:
fail-fast: false
matrix:
compiler: [gcc, clang]
sanitize: [address, thread]
pooled_delivery: [ON, OFF]
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
server_version: main
lib_write_deadline: ON
type: Debug
compiler: ${{ matrix.compiler }}
sanitize: ${{ matrix.sanitize }}
lib_msg_delivery: ${{ matrix.pooled_delivery }}

san-thread:
name: "Sanitize thread"
san-addr-deadline:
name: "Sanitize address (lib_write_deadline)"
uses: ./.github/workflows/build-test.yml
with:
sanitize: thread
type: Debug
sanitize: address
server_version: main
lib_write_deadline: ON

Windows:
name: "Windows"
Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/on-push-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,26 @@ jobs:
server_version: main
ubuntu_version: ${{ matrix.ubuntu_version }}
compiler: ${{ matrix.compiler }}

dev-mode:
name: "DEV_MODE"
uses: ./.github/workflows/build-test.yml
with:
dev_mode: ON
server_version: main
verbose_test_output: ON
verbose_make_output: ON

sanitize-addr:
name: "Sanitize address"
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
server_version: main

san-thread:
name: "Sanitize thread"
uses: ./.github/workflows/build-test.yml
with:
sanitize: thread
server_version: main
22 changes: 21 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1137,17 +1137,29 @@ _resendSubscriptions(natsConnection *nc)

adjustedMax = 0;
natsSub_Lock(sub);
if (sub->libDlvWorker != NULL)
{
natsMutex_Lock(sub->libDlvWorker->lock);
}
// If JS ordered consumer, trigger a reset. Don't check the error
// condition here. If there is a failure, it will be retried
// at the next HB interval.
if ((sub->jsi != NULL) && (sub->jsi->ordered))
{
jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1);
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
continue;
}
if (natsSub_drainStarted(sub))
{
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
continue;
}
Expand All @@ -1160,6 +1172,10 @@ _resendSubscriptions(natsConnection *nc)
// messages have reached the max, if so, unsubscribe.
if (adjustedMax == 0)
{
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
s = natsConn_sendUnsubProto(nc, sub->sid, 0);
continue;
Expand All @@ -1172,6 +1188,10 @@ _resendSubscriptions(natsConnection *nc)

// Hold the lock up to that point so we are sure not to resend
// any SUB/UNSUB for a subscription that is in draining mode.
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
}

Expand Down Expand Up @@ -3435,7 +3455,7 @@ natsConnection_Reconnect(natsConnection *nc)
return nats_setDefaultError(NATS_CONNECTION_CLOSED);
}

natsSock_Close(nc->sockCtx.fd);
natsSock_Shutdown(nc->sockCtx.fd);

natsConn_Unlock(nc);
return NATS_OK;
Expand Down
20 changes: 20 additions & 0 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,10 @@ _hbTimerFired(natsTimer *timer, void* closure)
natsStatus s = NATS_OK;

natsSub_Lock(sub);
if (sub->libDlvWorker != NULL)
{
natsMutex_Lock(sub->libDlvWorker->lock);
}
alert = !jsi->active;
oc = jsi->ordered;
jsi->active = false;
Expand All @@ -2062,10 +2066,18 @@ _hbTimerFired(natsTimer *timer, void* closure)
natsCondition_Signal(sub->cond);
natsTimer_Stop(timer);
}
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
return;
}
nc = sub->conn;
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);

if (!alert)
Expand All @@ -2075,12 +2087,20 @@ _hbTimerFired(natsTimer *timer, void* closure)
if (oc)
{
natsSub_Lock(sub);
if (sub->libDlvWorker != NULL)
{
natsMutex_Lock(sub->libDlvWorker->lock);
}
if (!sub->closed)
{
// If we fail in that call, we will report to async err callback
// (if one is specified).
s = jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1);
}
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
}

Expand Down
7 changes: 6 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ file(STRINGS list.txt listOfTestNames)

# For each test name
foreach(name ${listOfTestNames})

# Create a test and pass the index (start and end are the same)
# to the testsuite executable
add_test(NAME Test_${name}
Expand All @@ -45,6 +44,12 @@ foreach(name ${listOfTestNames})
# Make sure the test passes
set_tests_properties(Test_${name} PROPERTIES PASS_REGULAR_EXPRESSION "ALL PASSED")

# Set TSAN_OPTIONS for the test
if(NATS_SANITIZE)
set_tests_properties(Test_${name} PROPERTIES
ENVIRONMENT "TSAN_OPTIONS=detect_deadlocks=1:second_deadlock_stack=1:halt_on_error=1:report_signal_unsafe=1")
endif(NATS_SANITIZE)

# Bump the test index number
math(EXPR testIndex "${testIndex}+1")
endforeach()
Expand Down
15 changes: 13 additions & 2 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -5477,6 +5477,7 @@ test_natsSrvVersionAtLeast(void)
{
s = NATS_ERR;
}
natsConn_Unlock(nc);
}
}
testCond(s == NATS_OK);
Expand Down Expand Up @@ -13337,7 +13338,10 @@ test_ClientAutoUnsubAndReconnect(void)
nats_Sleep(10);

test("Received no more than max: ");
testCond((s == NATS_OK) && (arg.sum == 10));
natsMutex_Lock(arg.m);
int sum = arg.sum;
natsMutex_Unlock(arg.m);
testCond((s == NATS_OK) && (sum == 10));

natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
Expand Down Expand Up @@ -13390,6 +13394,7 @@ test_AutoUnsubNoUnsubOnDestroy(void)
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.done)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

natsConnection_Destroy(nc);
Expand Down Expand Up @@ -20252,6 +20257,7 @@ test_ForcedReconnect(void)
CHECK_SERVER_STARTED(pid);
IFOK(s, natsOptions_Create(&opts));
IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg));
IFOK(s, natsOptions_SetReconnectWait(opts, 100));
IFOK(s, natsConnection_Connect(&nc, opts));
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
testCond(s == NATS_OK);
Expand Down Expand Up @@ -20800,6 +20806,7 @@ test_EventLoop(void)
natsMutex_Lock(arg.m);
if (arg.attached != 2 || !arg.detached)
s = NATS_ERR;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

natsSubscription_Destroy(sub);
Expand Down Expand Up @@ -29485,6 +29492,9 @@ _jsOrderedErrHandler(natsConnection *nc, natsSubscription *subscription, natsSta
{
struct threadArg *args = (struct threadArg*) closure;

if (err != NATS_MISSED_HEARTBEAT)
return;

natsMutex_Lock(args->m);
args->status = err;
natsCondition_Signal(args->c);
Expand Down Expand Up @@ -29824,6 +29834,7 @@ test_JetStreamOrderedConsSrvRestart(void)
natsMutex_Lock(args.m);
while ((s != NATS_TIMEOUT) && !args.reconnected)
s = natsCondition_TimedWait(args.c, args.m, 2000);
natsMutex_Unlock(args.m);
testCond(s == NATS_OK);

test("Send 1 message: ");
Expand Down Expand Up @@ -33921,8 +33932,8 @@ test_MicroAsyncErrorHandler_MaxPendingMsgs(void)
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.closed)
s = natsCondition_TimedWait(arg.c, arg.m, 1000);
natsMutex_Unlock(arg.m);
testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER));
natsMutex_Unlock(arg.m);

microService_Destroy(m);
_waitForMicroservicesAllDone(&arg);
Expand Down

0 comments on commit bd700a9

Please sign in to comment.