From 26ef45a6878c0dd0889fcbb8544e831747b14041 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 25 Jan 2022 12:14:01 +0100 Subject: [PATCH 1/5] WIP: flush transport --- include/sentry.h | 30 ++++++++-- src/sentry_core.c | 10 ++++ src/sentry_sync.c | 68 +++++++++++++++++++++++ src/sentry_sync.h | 6 ++ src/sentry_transport.c | 18 ++++++ src/sentry_transport.h | 7 +++ src/transports/sentry_transport_curl.c | 8 +++ src/transports/sentry_transport_winhttp.c | 10 ++++ tests/unit/test_sync.c | 21 +++++++ tests/unit/tests.inc | 1 + 10 files changed, 175 insertions(+), 4 deletions(-) diff --git a/include/sentry.h b/include/sentry.h index 2154d82b2..9f82e7509 100644 --- a/include/sentry.h +++ b/include/sentry.h @@ -616,13 +616,16 @@ typedef struct sentry_options_s sentry_options_t; * * `startup_func`: This hook will be called by sentry inside of `sentry_init` * and instructs the transport to initialize itself. Failures will bubble up * to `sentry_init`. + * * `flush_func`: Instructs the transport to flush its queue. + * This hook receives a millisecond-resolution `timeout` parameter and should + * return `0` when the transport queue was flushed within the timeout. * * `shutdown_func`: Instructs the transport to flush its queue and shut down. * This hook receives a millisecond-resolution `timeout` parameter and should - * return `true` when the transport was flushed and shut down successfully. - * In case of `false`, sentry will log an error, but continue with freeing the - * transport. + * return `0` when the transport was flushed and shut down successfully. + * In case of a non-zero return value, sentry will log an error, but continue + * with freeing the transport. * * `free_func`: Frees the transports `state`. This hook might be called even - * though `shutdown_func` returned `false` previously. + * though `shutdown_func` returned a failure code previously. * * The transport interface might be extended in the future with hooks to flush * its internal queue without shutting down, and to dump its internal queue to @@ -662,6 +665,16 @@ SENTRY_API void sentry_transport_set_free_func( SENTRY_API void sentry_transport_set_startup_func(sentry_transport_t *transport, int (*startup_func)(const sentry_options_t *options, void *state)); +/** + * Sets the transport flush hook. + * + * This hook will receive a millisecond-resolution timeout. + * It should return `0` on success in case all the pending envelopes have been + * sent within the timeout, or `1` if the timeout was hit. + */ +SENTRY_API void sentry_transport_set_flush_func(sentry_transport_t *transport, + int (*flush_func)(uint64_t timeout, void *state)); + /** * Sets the transport shutdown hook. * @@ -1046,6 +1059,15 @@ SENTRY_API uint64_t sentry_options_get_shutdown_timeout(sentry_options_t *opts); */ SENTRY_API int sentry_init(sentry_options_t *options); +/** + * Instructs the transport to flush its send queue. + * + * The `timeout` parameter is in millisecond-resolution. + * + * Returns 0 on success, or a non-zero return value in case the timeout was hit. + */ +SENTRY_API int sentry_flush(uint64_t timeout); + /** * Shuts down the sentry client and forces transports to flush out. * diff --git a/src/sentry_core.c b/src/sentry_core.c index 712e5c7c6..1c0a822c6 100644 --- a/src/sentry_core.c +++ b/src/sentry_core.c @@ -200,6 +200,16 @@ sentry_init(sentry_options_t *options) return 1; } +int +sentry_flush(uint64_t timeout) +{ + int rv = 0; + SENTRY_WITH_OPTIONS (options) { + rv = sentry__transport_flush(options->transport, timeout); + } + return rv; +} + int sentry_close(void) { diff --git a/src/sentry_sync.c b/src/sentry_sync.c index b2e566c4b..2964d198d 100644 --- a/src/sentry_sync.c +++ b/src/sentry_sync.c @@ -281,6 +281,74 @@ sentry__bgworker_start(sentry_bgworker_t *bgw) return 0; } +typedef struct { + long refcount; + bool was_flushed; + sentry_cond_t signal; + sentry_mutex_t lock; +} sentry_flush_task_t; + +static void +sentry__flush_task(void *task_data, void *UNUSED(state)) +{ + sentry_flush_task_t *flush_task = (sentry_flush_task_t *)task_data; + fprintf(stderr, "locking on bg\n"); + sentry__mutex_lock(&flush_task->lock); + flush_task->was_flushed = true; + fprintf(stderr, "waking on bg\n"); + sentry__cond_wake(&flush_task->signal); + fprintf(stderr, "unlocking on bg\n"); + sentry__mutex_unlock(&flush_task->lock); +} + +static void +sentry__flush_task_decref(sentry_flush_task_t *task) +{ + if (sentry__atomic_fetch_and_add(&task->refcount, -1) == 1) { + sentry_free(task); + } +} + +int +sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout) +{ + if (!sentry__atomic_fetch(&bgw->running)) { + SENTRY_WARN("trying to flush non-running thread"); + return 0; + } + SENTRY_TRACE("flushing background worker thread"); + + sentry_flush_task_t *flush_task + = sentry_malloc(sizeof(sentry_flush_task_t)); + if (!flush_task) { + return 1; + } + flush_task->refcount = 2; // this thread + background worker + flush_task->was_flushed = false; + sentry__cond_init(&flush_task->signal); + sentry__mutex_init(&flush_task->lock); + + fprintf(stderr, "locking on main\n"); + sentry__mutex_lock(&flush_task->lock); + /* submit the task that triggers our condvar once it runs */ + fprintf(stderr, "submit on main\n"); + sentry__bgworker_submit( + bgw, sentry__flush_task, sentry__flush_task_decref, flush_task); + + // this will implicitly release the lock, and re-acquire on wake + fprintf(stderr, "wait on main\n"); + sentry__cond_wait_timeout(&flush_task->signal, &flush_task->lock, timeout); + + bool was_flushed = flush_task->was_flushed; + + fprintf(stderr, "unlock on main\n"); + sentry__mutex_unlock(&flush_task->lock); + sentry__flush_task_decref(flush_task); + + // return `0` on success + return !was_flushed; +} + static void shutdown_task(void *task_data, void *UNUSED(state)) { diff --git a/src/sentry_sync.h b/src/sentry_sync.h index 96e353b86..11915536a 100644 --- a/src/sentry_sync.h +++ b/src/sentry_sync.h @@ -394,6 +394,12 @@ void sentry__bgworker_decref(sentry_bgworker_t *bgw); */ int sentry__bgworker_start(sentry_bgworker_t *bgw); +/** + * This will try to flush the background worker thread queue, with a `timeout`. + * Returns 0 on success. + */ +int sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout); + /** * This will try to shut down the background worker thread, with a `timeout`. * Returns 0 on success. diff --git a/src/sentry_transport.c b/src/sentry_transport.c index 351ddca0b..51f8e9b39 100644 --- a/src/sentry_transport.c +++ b/src/sentry_transport.c @@ -12,6 +12,7 @@ typedef struct sentry_transport_s { void (*send_envelope_func)(sentry_envelope_t *envelope, void *state); int (*startup_func)(const sentry_options_t *options, void *state); int (*shutdown_func)(uint64_t timeout, void *state); + int (*flush_func)(uint64_t timeout, void *state); void (*free_func)(void *state); size_t (*dump_func)(sentry_run_t *run, void *state); void *state; @@ -57,6 +58,13 @@ sentry_transport_set_shutdown_func(sentry_transport_t *transport, transport->shutdown_func = shutdown_func; } +void +sentry_transport_set_flush_func(sentry_transport_t *transport, + int (*flush_func)(uint64_t timeout, void *state)) +{ + transport->flush_func = flush_func; +} + void sentry__transport_send_envelope( sentry_transport_t *transport, sentry_envelope_t *envelope) @@ -86,6 +94,16 @@ sentry__transport_startup( return 0; } +int +sentry__transport_flush(sentry_transport_t *transport, uint64_t timeout) +{ + if (transport->flush_func && transport->running) { + SENTRY_TRACE("flushing transport"); + return transport->flush_func(timeout, transport->state); + } + return 0; +} + int sentry__transport_shutdown(sentry_transport_t *transport, uint64_t timeout) { diff --git a/src/sentry_transport.h b/src/sentry_transport.h index 2ab786d10..f30788107 100644 --- a/src/sentry_transport.h +++ b/src/sentry_transport.h @@ -31,6 +31,13 @@ void sentry__transport_send_envelope( int sentry__transport_startup( sentry_transport_t *transport, const sentry_options_t *options); +/** + * Instructs the transport to flush its queue. + * + * Returns 0 on success. + */ +int sentry__transport_flush(sentry_transport_t *transport, uint64_t timeout); + /** * Instructs the transport to shut down. * diff --git a/src/transports/sentry_transport_curl.c b/src/transports/sentry_transport_curl.c index 19ca7c995..31605073f 100644 --- a/src/transports/sentry_transport_curl.c +++ b/src/transports/sentry_transport_curl.c @@ -89,6 +89,13 @@ sentry__curl_transport_start( return sentry__bgworker_start(bgworker); } +static int +sentry__curl_transport_flush(uint64_t timeout, void *transport_state) +{ + sentry_bgworker_t *bgworker = (sentry_bgworker_t *)transport_state; + return sentry__bgworker_flush(bgworker, timeout); +} + static int sentry__curl_transport_shutdown(uint64_t timeout, void *transport_state) { @@ -258,6 +265,7 @@ sentry__transport_new_default(void) sentry_transport_set_free_func( transport, (void (*)(void *))sentry__bgworker_decref); sentry_transport_set_startup_func(transport, sentry__curl_transport_start); + sentry_transport_set_flush_func(transport, sentry__curl_transport_flush); sentry_transport_set_shutdown_func( transport, sentry__curl_transport_shutdown); sentry__transport_set_dump_func(transport, sentry__curl_dump_queue); diff --git a/src/transports/sentry_transport_winhttp.c b/src/transports/sentry_transport_winhttp.c index 5bfdf8171..b1709ffbb 100644 --- a/src/transports/sentry_transport_winhttp.c +++ b/src/transports/sentry_transport_winhttp.c @@ -107,6 +107,15 @@ sentry__winhttp_transport_start( return sentry__bgworker_start(bgworker); } +static int +sentry__winhttp_transport_flush(uint64_t timeout, void *transport_state) +{ + sentry_bgworker_t *bgworker = (sentry_bgworker_t *)transport_state; + winhttp_bgworker_state_t *state = sentry__bgworker_get_state(bgworker); + + return sentry__bgworker_shutdown(bgworker, timeout); +} + static int sentry__winhttp_transport_shutdown(uint64_t timeout, void *transport_state) { @@ -332,6 +341,7 @@ sentry__transport_new_default(void) transport, (void (*)(void *))sentry__bgworker_decref); sentry_transport_set_startup_func( transport, sentry__winhttp_transport_start); + sentry_transport_set_flush_func(transport, sentry__winhttp_transport_flush); sentry_transport_set_shutdown_func( transport, sentry__winhttp_transport_shutdown); sentry__transport_set_dump_func(transport, sentry__winhttp_dump_queue); diff --git a/tests/unit/test_sync.c b/tests/unit/test_sync.c index a90a86206..d265508aa 100644 --- a/tests/unit/test_sync.c +++ b/tests/unit/test_sync.c @@ -131,3 +131,24 @@ SENTRY_TEST(task_queue) // was instructed to shut down TEST_CHECK(executed_after_shutdown); } + +SENTRY_TEST(bgworker_flush) +{ + return; + sentry_bgworker_t *bgw = sentry__bgworker_new(NULL, NULL); + sentry__bgworker_submit(bgw, sleep_task, NULL, NULL); + + sentry__bgworker_start(bgw); + + // first flush times out + int flush = sentry__bgworker_flush(bgw, 500); + TEST_CHECK_INT_EQUAL(flush, 1); + + // second flush succeeds + flush = sentry__bgworker_flush(bgw, 1000); + TEST_CHECK_INT_EQUAL(flush, 0); + + int shutdown = sentry__bgworker_shutdown(bgw, 500); + TEST_CHECK_INT_EQUAL(shutdown, 0); + sentry__bgworker_decref(bgw); +} diff --git a/tests/unit/tests.inc b/tests/unit/tests.inc index 7114022d5..2f54b3e74 100644 --- a/tests/unit/tests.inc +++ b/tests/unit/tests.inc @@ -9,6 +9,7 @@ XX(basic_http_request_preparation_for_transaction) XX(basic_spans) XX(basic_tracing_context) XX(basic_transaction) +XX(bgworker_flush) XX(buildid_fallback) XX(child_spans) XX(concurrent_init) From f67bd97fcca4b6fc01a1df69c392c16d4ac9e3a9 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 25 Jan 2022 16:12:07 +0100 Subject: [PATCH 2/5] flush on mac --- src/sentry_sync.c | 35 +++++++++++++++++++---------------- tests/unit/test_sync.c | 1 - 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/sentry_sync.c b/src/sentry_sync.c index 2964d198d..f23dbb8a4 100644 --- a/src/sentry_sync.c +++ b/src/sentry_sync.c @@ -292,12 +292,10 @@ static void sentry__flush_task(void *task_data, void *UNUSED(state)) { sentry_flush_task_t *flush_task = (sentry_flush_task_t *)task_data; - fprintf(stderr, "locking on bg\n"); + sentry__mutex_lock(&flush_task->lock); flush_task->was_flushed = true; - fprintf(stderr, "waking on bg\n"); sentry__cond_wake(&flush_task->signal); - fprintf(stderr, "unlocking on bg\n"); sentry__mutex_unlock(&flush_task->lock); } @@ -305,6 +303,7 @@ static void sentry__flush_task_decref(sentry_flush_task_t *task) { if (sentry__atomic_fetch_and_add(&task->refcount, -1) == 1) { + sentry__mutex_free(&task->lock); sentry_free(task); } } @@ -328,25 +327,29 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout) sentry__cond_init(&flush_task->signal); sentry__mutex_init(&flush_task->lock); - fprintf(stderr, "locking on main\n"); sentry__mutex_lock(&flush_task->lock); + /* submit the task that triggers our condvar once it runs */ - fprintf(stderr, "submit on main\n"); - sentry__bgworker_submit( - bgw, sentry__flush_task, sentry__flush_task_decref, flush_task); + sentry__bgworker_submit(bgw, sentry__flush_task, + (void (*)(void *))sentry__flush_task_decref, flush_task); - // this will implicitly release the lock, and re-acquire on wake - fprintf(stderr, "wait on main\n"); - sentry__cond_wait_timeout(&flush_task->signal, &flush_task->lock, timeout); + uint64_t started = sentry__monotonic_time(); + bool was_flushed = false; + while (true) { + was_flushed = flush_task->was_flushed; - bool was_flushed = flush_task->was_flushed; + uint64_t now = sentry__monotonic_time(); + if (was_flushed || (now > started && now - started > timeout)) { + sentry__mutex_unlock(&flush_task->lock); + sentry__flush_task_decref(flush_task); - fprintf(stderr, "unlock on main\n"); - sentry__mutex_unlock(&flush_task->lock); - sentry__flush_task_decref(flush_task); + // return `0` on success + return !was_flushed; + } - // return `0` on success - return !was_flushed; + // this will implicitly release the lock, and re-acquire on wake + sentry__cond_wait_timeout(&flush_task->signal, &flush_task->lock, 250); + } } static void diff --git a/tests/unit/test_sync.c b/tests/unit/test_sync.c index d265508aa..aba8e8290 100644 --- a/tests/unit/test_sync.c +++ b/tests/unit/test_sync.c @@ -134,7 +134,6 @@ SENTRY_TEST(task_queue) SENTRY_TEST(bgworker_flush) { - return; sentry_bgworker_t *bgw = sentry__bgworker_new(NULL, NULL); sentry__bgworker_submit(bgw, sleep_task, NULL, NULL); From 45fd36dc535234b37fa56c5d4bd76406fd55c7b6 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 25 Jan 2022 16:31:03 +0100 Subject: [PATCH 3/5] init flush task memory --- src/sentry_sync.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry_sync.c b/src/sentry_sync.c index f23dbb8a4..a6b6380a1 100644 --- a/src/sentry_sync.c +++ b/src/sentry_sync.c @@ -322,6 +322,7 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout) if (!flush_task) { return 1; } + memset(flush_task, 0, sizeof(sentry_flush_task_t)); flush_task->refcount = 2; // this thread + background worker flush_task->was_flushed = false; sentry__cond_init(&flush_task->signal); From bc9c132220116104203bb4967a5a6f7042d8c5d2 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 25 Jan 2022 16:36:17 +0100 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Sebastian Zivota --- include/sentry.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/include/sentry.h b/include/sentry.h index 9f82e7509..1ae610590 100644 --- a/include/sentry.h +++ b/include/sentry.h @@ -618,10 +618,10 @@ typedef struct sentry_options_s sentry_options_t; * to `sentry_init`. * * `flush_func`: Instructs the transport to flush its queue. * This hook receives a millisecond-resolution `timeout` parameter and should - * return `0` when the transport queue was flushed within the timeout. + * return `0` if the transport queue is flushed within the timeout. * * `shutdown_func`: Instructs the transport to flush its queue and shut down. * This hook receives a millisecond-resolution `timeout` parameter and should - * return `0` when the transport was flushed and shut down successfully. + * return `0` if the transport is flushed and shut down successfully. * In case of a non-zero return value, sentry will log an error, but continue * with freeing the transport. * * `free_func`: Frees the transports `state`. This hook might be called even @@ -669,8 +669,8 @@ SENTRY_API void sentry_transport_set_startup_func(sentry_transport_t *transport, * Sets the transport flush hook. * * This hook will receive a millisecond-resolution timeout. - * It should return `0` on success in case all the pending envelopes have been - * sent within the timeout, or `1` if the timeout was hit. + * It should return `0` if all the pending envelopes are + * sent within the timeout, or `1` if the timeout is hit. */ SENTRY_API void sentry_transport_set_flush_func(sentry_transport_t *transport, int (*flush_func)(uint64_t timeout, void *state)); @@ -1062,9 +1062,9 @@ SENTRY_API int sentry_init(sentry_options_t *options); /** * Instructs the transport to flush its send queue. * - * The `timeout` parameter is in millisecond-resolution. + * The `timeout` parameter is in milliseconds. * - * Returns 0 on success, or a non-zero return value in case the timeout was hit. + * Returns 0 on success, or a non-zero return value in case the timeout is hit. */ SENTRY_API int sentry_flush(uint64_t timeout); From cc6c1ddcabc077276861e4c8edbfca8117a4503e Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 25 Jan 2022 16:42:48 +0100 Subject: [PATCH 5/5] rm unused variable --- src/transports/sentry_transport_winhttp.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/transports/sentry_transport_winhttp.c b/src/transports/sentry_transport_winhttp.c index b1709ffbb..ef9e74b67 100644 --- a/src/transports/sentry_transport_winhttp.c +++ b/src/transports/sentry_transport_winhttp.c @@ -111,8 +111,6 @@ static int sentry__winhttp_transport_flush(uint64_t timeout, void *transport_state) { sentry_bgworker_t *bgworker = (sentry_bgworker_t *)transport_state; - winhttp_bgworker_state_t *state = sentry__bgworker_get_state(bgworker); - return sentry__bgworker_shutdown(bgworker, timeout); }