Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add explicit flush method/hook (NATIVE-111) #670

Merged
merged 5 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions include/sentry.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,13 +616,16 @@ typedef struct sentry_options_s sentry_options_t;
* * `startup_func`: This hook will be called by sentry inside of `sentry_init`
* and instructs the transport to initialize itself. Failures will bubble up
* to `sentry_init`.
* * `flush_func`: Instructs the transport to flush its queue.
* This hook receives a millisecond-resolution `timeout` parameter and should
* return `0` when the transport queue was flushed within the timeout.
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
* * `shutdown_func`: Instructs the transport to flush its queue and shut down.
* This hook receives a millisecond-resolution `timeout` parameter and should
* return `true` when the transport was flushed and shut down successfully.
* In case of `false`, sentry will log an error, but continue with freeing the
* transport.
* return `0` when the transport was flushed and shut down successfully.
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
* In case of a non-zero return value, sentry will log an error, but continue
* with freeing the transport.
* * `free_func`: Frees the transports `state`. This hook might be called even
* though `shutdown_func` returned `false` previously.
* though `shutdown_func` returned a failure code previously.
*
* The transport interface might be extended in the future with hooks to flush
* its internal queue without shutting down, and to dump its internal queue to
Expand Down Expand Up @@ -662,6 +665,16 @@ SENTRY_API void sentry_transport_set_free_func(
SENTRY_API void sentry_transport_set_startup_func(sentry_transport_t *transport,
int (*startup_func)(const sentry_options_t *options, void *state));

/**
* Sets the transport flush hook.
*
* This hook will receive a millisecond-resolution timeout.
* It should return `0` on success in case all the pending envelopes have been
* sent within the timeout, or `1` if the timeout was hit.
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
*/
SENTRY_API void sentry_transport_set_flush_func(sentry_transport_t *transport,
int (*flush_func)(uint64_t timeout, void *state));

/**
* Sets the transport shutdown hook.
*
Expand Down Expand Up @@ -1046,6 +1059,15 @@ SENTRY_API uint64_t sentry_options_get_shutdown_timeout(sentry_options_t *opts);
*/
SENTRY_API int sentry_init(sentry_options_t *options);

/**
* Instructs the transport to flush its send queue.
*
* The `timeout` parameter is in millisecond-resolution.
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
*
* Returns 0 on success, or a non-zero return value in case the timeout was hit.
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
*/
SENTRY_API int sentry_flush(uint64_t timeout);

/**
* Shuts down the sentry client and forces transports to flush out.
*
Expand Down
10 changes: 10 additions & 0 deletions src/sentry_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ sentry_init(sentry_options_t *options)
return 1;
}

int
sentry_flush(uint64_t timeout)
{
int rv = 0;
SENTRY_WITH_OPTIONS (options) {
rv = sentry__transport_flush(options->transport, timeout);
}
return rv;
}

int
sentry_close(void)
{
Expand Down
71 changes: 71 additions & 0 deletions src/sentry_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,77 @@ sentry__bgworker_start(sentry_bgworker_t *bgw)
return 0;
}

typedef struct {
long refcount;
bool was_flushed;
sentry_cond_t signal;
sentry_mutex_t lock;
} sentry_flush_task_t;

static void
sentry__flush_task(void *task_data, void *UNUSED(state))
{
sentry_flush_task_t *flush_task = (sentry_flush_task_t *)task_data;

sentry__mutex_lock(&flush_task->lock);
flush_task->was_flushed = true;
sentry__cond_wake(&flush_task->signal);
sentry__mutex_unlock(&flush_task->lock);
}

static void
sentry__flush_task_decref(sentry_flush_task_t *task)
{
if (sentry__atomic_fetch_and_add(&task->refcount, -1) == 1) {
sentry__mutex_free(&task->lock);
sentry_free(task);
}
}

int
sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
{
if (!sentry__atomic_fetch(&bgw->running)) {
SENTRY_WARN("trying to flush non-running thread");
return 0;
}
SENTRY_TRACE("flushing background worker thread");

sentry_flush_task_t *flush_task
= sentry_malloc(sizeof(sentry_flush_task_t));
if (!flush_task) {
return 1;
}
flush_task->refcount = 2; // this thread + background worker
flush_task->was_flushed = false;
sentry__cond_init(&flush_task->signal);
sentry__mutex_init(&flush_task->lock);

sentry__mutex_lock(&flush_task->lock);

/* submit the task that triggers our condvar once it runs */
sentry__bgworker_submit(bgw, sentry__flush_task,
(void (*)(void *))sentry__flush_task_decref, flush_task);

uint64_t started = sentry__monotonic_time();
bool was_flushed = false;
while (true) {
was_flushed = flush_task->was_flushed;

uint64_t now = sentry__monotonic_time();
if (was_flushed || (now > started && now - started > timeout)) {
loewenheim marked this conversation as resolved.
Show resolved Hide resolved
sentry__mutex_unlock(&flush_task->lock);
sentry__flush_task_decref(flush_task);

// return `0` on success
return !was_flushed;
}

// this will implicitly release the lock, and re-acquire on wake
sentry__cond_wait_timeout(&flush_task->signal, &flush_task->lock, 250);
}
}

static void
shutdown_task(void *task_data, void *UNUSED(state))
{
Expand Down
6 changes: 6 additions & 0 deletions src/sentry_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@ void sentry__bgworker_decref(sentry_bgworker_t *bgw);
*/
int sentry__bgworker_start(sentry_bgworker_t *bgw);

/**
* This will try to flush the background worker thread queue, with a `timeout`.
* Returns 0 on success.
*/
int sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout);

/**
* This will try to shut down the background worker thread, with a `timeout`.
* Returns 0 on success.
Expand Down
18 changes: 18 additions & 0 deletions src/sentry_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ typedef struct sentry_transport_s {
void (*send_envelope_func)(sentry_envelope_t *envelope, void *state);
int (*startup_func)(const sentry_options_t *options, void *state);
int (*shutdown_func)(uint64_t timeout, void *state);
int (*flush_func)(uint64_t timeout, void *state);
void (*free_func)(void *state);
size_t (*dump_func)(sentry_run_t *run, void *state);
void *state;
Expand Down Expand Up @@ -57,6 +58,13 @@ sentry_transport_set_shutdown_func(sentry_transport_t *transport,
transport->shutdown_func = shutdown_func;
}

void
sentry_transport_set_flush_func(sentry_transport_t *transport,
int (*flush_func)(uint64_t timeout, void *state))
{
transport->flush_func = flush_func;
}

void
sentry__transport_send_envelope(
sentry_transport_t *transport, sentry_envelope_t *envelope)
Expand Down Expand Up @@ -86,6 +94,16 @@ sentry__transport_startup(
return 0;
}

int
sentry__transport_flush(sentry_transport_t *transport, uint64_t timeout)
{
if (transport->flush_func && transport->running) {
SENTRY_TRACE("flushing transport");
return transport->flush_func(timeout, transport->state);
}
return 0;
}

int
sentry__transport_shutdown(sentry_transport_t *transport, uint64_t timeout)
{
Expand Down
7 changes: 7 additions & 0 deletions src/sentry_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ void sentry__transport_send_envelope(
int sentry__transport_startup(
sentry_transport_t *transport, const sentry_options_t *options);

/**
* Instructs the transport to flush its queue.
*
* Returns 0 on success.
*/
int sentry__transport_flush(sentry_transport_t *transport, uint64_t timeout);

/**
* Instructs the transport to shut down.
*
Expand Down
8 changes: 8 additions & 0 deletions src/transports/sentry_transport_curl.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ sentry__curl_transport_start(
return sentry__bgworker_start(bgworker);
}

static int
sentry__curl_transport_flush(uint64_t timeout, void *transport_state)
{
sentry_bgworker_t *bgworker = (sentry_bgworker_t *)transport_state;
return sentry__bgworker_flush(bgworker, timeout);
}

static int
sentry__curl_transport_shutdown(uint64_t timeout, void *transport_state)
{
Expand Down Expand Up @@ -258,6 +265,7 @@ sentry__transport_new_default(void)
sentry_transport_set_free_func(
transport, (void (*)(void *))sentry__bgworker_decref);
sentry_transport_set_startup_func(transport, sentry__curl_transport_start);
sentry_transport_set_flush_func(transport, sentry__curl_transport_flush);
sentry_transport_set_shutdown_func(
transport, sentry__curl_transport_shutdown);
sentry__transport_set_dump_func(transport, sentry__curl_dump_queue);
Expand Down
10 changes: 10 additions & 0 deletions src/transports/sentry_transport_winhttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ sentry__winhttp_transport_start(
return sentry__bgworker_start(bgworker);
}

static int
sentry__winhttp_transport_flush(uint64_t timeout, void *transport_state)
{
sentry_bgworker_t *bgworker = (sentry_bgworker_t *)transport_state;
winhttp_bgworker_state_t *state = sentry__bgworker_get_state(bgworker);

return sentry__bgworker_shutdown(bgworker, timeout);
}

static int
sentry__winhttp_transport_shutdown(uint64_t timeout, void *transport_state)
{
Expand Down Expand Up @@ -332,6 +341,7 @@ sentry__transport_new_default(void)
transport, (void (*)(void *))sentry__bgworker_decref);
sentry_transport_set_startup_func(
transport, sentry__winhttp_transport_start);
sentry_transport_set_flush_func(transport, sentry__winhttp_transport_flush);
sentry_transport_set_shutdown_func(
transport, sentry__winhttp_transport_shutdown);
sentry__transport_set_dump_func(transport, sentry__winhttp_dump_queue);
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/test_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,23 @@ SENTRY_TEST(task_queue)
// was instructed to shut down
TEST_CHECK(executed_after_shutdown);
}

SENTRY_TEST(bgworker_flush)
{
sentry_bgworker_t *bgw = sentry__bgworker_new(NULL, NULL);
sentry__bgworker_submit(bgw, sleep_task, NULL, NULL);

sentry__bgworker_start(bgw);

// first flush times out
int flush = sentry__bgworker_flush(bgw, 500);
TEST_CHECK_INT_EQUAL(flush, 1);

// second flush succeeds
flush = sentry__bgworker_flush(bgw, 1000);
TEST_CHECK_INT_EQUAL(flush, 0);

int shutdown = sentry__bgworker_shutdown(bgw, 500);
TEST_CHECK_INT_EQUAL(shutdown, 0);
sentry__bgworker_decref(bgw);
}
1 change: 1 addition & 0 deletions tests/unit/tests.inc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ XX(basic_http_request_preparation_for_transaction)
XX(basic_spans)
XX(basic_tracing_context)
XX(basic_transaction)
XX(bgworker_flush)
XX(buildid_fallback)
XX(child_spans)
XX(concurrent_init)
Expand Down