Skip to content

Commit

Permalink
feat: Add explicit flush method/hook (NATIVE-111) (#670)
Browse files Browse the repository at this point in the history
Co-authored-by: Sebastian Zivota <[email protected]>
  • Loading branch information
Swatinem and loewenheim authored Jan 25, 2022
1 parent ef36a87 commit 55c07c9
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 4 deletions.
30 changes: 26 additions & 4 deletions include/sentry.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,13 +616,16 @@ typedef struct sentry_options_s sentry_options_t;
* * `startup_func`: This hook will be called by sentry inside of `sentry_init`
* and instructs the transport to initialize itself. Failures will bubble up
* to `sentry_init`.
* * `flush_func`: Instructs the transport to flush its queue.
* This hook receives a millisecond-resolution `timeout` parameter and should
* return `0` if the transport queue is flushed within the timeout.
* * `shutdown_func`: Instructs the transport to flush its queue and shut down.
* This hook receives a millisecond-resolution `timeout` parameter and should
* return `true` when the transport was flushed and shut down successfully.
* In case of `false`, sentry will log an error, but continue with freeing the
* transport.
* return `0` if the transport is flushed and shut down successfully.
* In case of a non-zero return value, sentry will log an error, but continue
* with freeing the transport.
* * `free_func`: Frees the transports `state`. This hook might be called even
* though `shutdown_func` returned `false` previously.
* though `shutdown_func` returned a failure code previously.
*
* The transport interface might be extended in the future with hooks to flush
* its internal queue without shutting down, and to dump its internal queue to
Expand Down Expand Up @@ -662,6 +665,16 @@ SENTRY_API void sentry_transport_set_free_func(
SENTRY_API void sentry_transport_set_startup_func(sentry_transport_t *transport,
int (*startup_func)(const sentry_options_t *options, void *state));

/**
* Sets the transport flush hook.
*
* This hook will receive a millisecond-resolution timeout.
* It should return `0` if all the pending envelopes are
* sent within the timeout, or `1` if the timeout is hit.
*/
SENTRY_API void sentry_transport_set_flush_func(sentry_transport_t *transport,
int (*flush_func)(uint64_t timeout, void *state));

/**
* Sets the transport shutdown hook.
*
Expand Down Expand Up @@ -1046,6 +1059,15 @@ SENTRY_API uint64_t sentry_options_get_shutdown_timeout(sentry_options_t *opts);
*/
SENTRY_API int sentry_init(sentry_options_t *options);

/**
* Instructs the transport to flush its send queue.
*
* The `timeout` parameter is in milliseconds.
*
* Returns 0 on success, or a non-zero return value in case the timeout is hit.
*/
SENTRY_API int sentry_flush(uint64_t timeout);

/**
* Shuts down the sentry client and forces transports to flush out.
*
Expand Down
10 changes: 10 additions & 0 deletions src/sentry_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ sentry_init(sentry_options_t *options)
return 1;
}

int
sentry_flush(uint64_t timeout)
{
int rv = 0;
SENTRY_WITH_OPTIONS (options) {
rv = sentry__transport_flush(options->transport, timeout);
}
return rv;
}

int
sentry_close(void)
{
Expand Down
72 changes: 72 additions & 0 deletions src/sentry_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,78 @@ sentry__bgworker_start(sentry_bgworker_t *bgw)
return 0;
}

typedef struct {
long refcount;
bool was_flushed;
sentry_cond_t signal;
sentry_mutex_t lock;
} sentry_flush_task_t;

static void
sentry__flush_task(void *task_data, void *UNUSED(state))
{
sentry_flush_task_t *flush_task = (sentry_flush_task_t *)task_data;

sentry__mutex_lock(&flush_task->lock);
flush_task->was_flushed = true;
sentry__cond_wake(&flush_task->signal);
sentry__mutex_unlock(&flush_task->lock);
}

static void
sentry__flush_task_decref(sentry_flush_task_t *task)
{
if (sentry__atomic_fetch_and_add(&task->refcount, -1) == 1) {
sentry__mutex_free(&task->lock);
sentry_free(task);
}
}

int
sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
{
if (!sentry__atomic_fetch(&bgw->running)) {
SENTRY_WARN("trying to flush non-running thread");
return 0;
}
SENTRY_TRACE("flushing background worker thread");

sentry_flush_task_t *flush_task
= sentry_malloc(sizeof(sentry_flush_task_t));
if (!flush_task) {
return 1;
}
memset(flush_task, 0, sizeof(sentry_flush_task_t));
flush_task->refcount = 2; // this thread + background worker
flush_task->was_flushed = false;
sentry__cond_init(&flush_task->signal);
sentry__mutex_init(&flush_task->lock);

sentry__mutex_lock(&flush_task->lock);

/* submit the task that triggers our condvar once it runs */
sentry__bgworker_submit(bgw, sentry__flush_task,
(void (*)(void *))sentry__flush_task_decref, flush_task);

uint64_t started = sentry__monotonic_time();
bool was_flushed = false;
while (true) {
was_flushed = flush_task->was_flushed;

uint64_t now = sentry__monotonic_time();
if (was_flushed || (now > started && now - started > timeout)) {
sentry__mutex_unlock(&flush_task->lock);
sentry__flush_task_decref(flush_task);

// return `0` on success
return !was_flushed;
}

// this will implicitly release the lock, and re-acquire on wake
sentry__cond_wait_timeout(&flush_task->signal, &flush_task->lock, 250);
}
}

static void
shutdown_task(void *task_data, void *UNUSED(state))
{
Expand Down
6 changes: 6 additions & 0 deletions src/sentry_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@ void sentry__bgworker_decref(sentry_bgworker_t *bgw);
*/
int sentry__bgworker_start(sentry_bgworker_t *bgw);

/**
* This will try to flush the background worker thread queue, with a `timeout`.
* Returns 0 on success.
*/
int sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout);

/**
* This will try to shut down the background worker thread, with a `timeout`.
* Returns 0 on success.
Expand Down
18 changes: 18 additions & 0 deletions src/sentry_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ typedef struct sentry_transport_s {
void (*send_envelope_func)(sentry_envelope_t *envelope, void *state);
int (*startup_func)(const sentry_options_t *options, void *state);
int (*shutdown_func)(uint64_t timeout, void *state);
int (*flush_func)(uint64_t timeout, void *state);
void (*free_func)(void *state);
size_t (*dump_func)(sentry_run_t *run, void *state);
void *state;
Expand Down Expand Up @@ -57,6 +58,13 @@ sentry_transport_set_shutdown_func(sentry_transport_t *transport,
transport->shutdown_func = shutdown_func;
}

void
sentry_transport_set_flush_func(sentry_transport_t *transport,
int (*flush_func)(uint64_t timeout, void *state))
{
transport->flush_func = flush_func;
}

void
sentry__transport_send_envelope(
sentry_transport_t *transport, sentry_envelope_t *envelope)
Expand Down Expand Up @@ -86,6 +94,16 @@ sentry__transport_startup(
return 0;
}

int
sentry__transport_flush(sentry_transport_t *transport, uint64_t timeout)
{
if (transport->flush_func && transport->running) {
SENTRY_TRACE("flushing transport");
return transport->flush_func(timeout, transport->state);
}
return 0;
}

int
sentry__transport_shutdown(sentry_transport_t *transport, uint64_t timeout)
{
Expand Down
7 changes: 7 additions & 0 deletions src/sentry_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ void sentry__transport_send_envelope(
int sentry__transport_startup(
sentry_transport_t *transport, const sentry_options_t *options);

/**
* Instructs the transport to flush its queue.
*
* Returns 0 on success.
*/
int sentry__transport_flush(sentry_transport_t *transport, uint64_t timeout);

/**
* Instructs the transport to shut down.
*
Expand Down
8 changes: 8 additions & 0 deletions src/transports/sentry_transport_curl.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ sentry__curl_transport_start(
return sentry__bgworker_start(bgworker);
}

static int
sentry__curl_transport_flush(uint64_t timeout, void *transport_state)
{
sentry_bgworker_t *bgworker = (sentry_bgworker_t *)transport_state;
return sentry__bgworker_flush(bgworker, timeout);
}

static int
sentry__curl_transport_shutdown(uint64_t timeout, void *transport_state)
{
Expand Down Expand Up @@ -258,6 +265,7 @@ sentry__transport_new_default(void)
sentry_transport_set_free_func(
transport, (void (*)(void *))sentry__bgworker_decref);
sentry_transport_set_startup_func(transport, sentry__curl_transport_start);
sentry_transport_set_flush_func(transport, sentry__curl_transport_flush);
sentry_transport_set_shutdown_func(
transport, sentry__curl_transport_shutdown);
sentry__transport_set_dump_func(transport, sentry__curl_dump_queue);
Expand Down
8 changes: 8 additions & 0 deletions src/transports/sentry_transport_winhttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ sentry__winhttp_transport_start(
return sentry__bgworker_start(bgworker);
}

static int
sentry__winhttp_transport_flush(uint64_t timeout, void *transport_state)
{
sentry_bgworker_t *bgworker = (sentry_bgworker_t *)transport_state;
return sentry__bgworker_shutdown(bgworker, timeout);
}

static int
sentry__winhttp_transport_shutdown(uint64_t timeout, void *transport_state)
{
Expand Down Expand Up @@ -332,6 +339,7 @@ sentry__transport_new_default(void)
transport, (void (*)(void *))sentry__bgworker_decref);
sentry_transport_set_startup_func(
transport, sentry__winhttp_transport_start);
sentry_transport_set_flush_func(transport, sentry__winhttp_transport_flush);
sentry_transport_set_shutdown_func(
transport, sentry__winhttp_transport_shutdown);
sentry__transport_set_dump_func(transport, sentry__winhttp_dump_queue);
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/test_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,23 @@ SENTRY_TEST(task_queue)
// was instructed to shut down
TEST_CHECK(executed_after_shutdown);
}

SENTRY_TEST(bgworker_flush)
{
sentry_bgworker_t *bgw = sentry__bgworker_new(NULL, NULL);
sentry__bgworker_submit(bgw, sleep_task, NULL, NULL);

sentry__bgworker_start(bgw);

// first flush times out
int flush = sentry__bgworker_flush(bgw, 500);
TEST_CHECK_INT_EQUAL(flush, 1);

// second flush succeeds
flush = sentry__bgworker_flush(bgw, 1000);
TEST_CHECK_INT_EQUAL(flush, 0);

int shutdown = sentry__bgworker_shutdown(bgw, 500);
TEST_CHECK_INT_EQUAL(shutdown, 0);
sentry__bgworker_decref(bgw);
}
1 change: 1 addition & 0 deletions tests/unit/tests.inc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ XX(basic_http_request_preparation_for_transaction)
XX(basic_spans)
XX(basic_tracing_context)
XX(basic_transaction)
XX(bgworker_flush)
XX(buildid_fallback)
XX(child_spans)
XX(concurrent_init)
Expand Down

0 comments on commit 55c07c9

Please sign in to comment.