Skip to content

Commit

Permalink
[PROF-8667] Heap Profiling - Part 3 - Snapshot system (#3328)
Browse files Browse the repository at this point in the history
This commit follows bb96f8f and reduces the conflict of serialization and sampling which was forcing us to potentially drop some allocation samples on the floor. It also removes the need for heap recorder to have to use a mutex by switching to a iterate-over-snapshot strategy.
  • Loading branch information
AlexJF authored Dec 21, 2023
1 parent e6c48b0 commit d4b2b3e
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 82 deletions.
7 changes: 0 additions & 7 deletions ext/ddtrace_profiling_native_extension/collectors_stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ static VALUE _native_sample(
);
static void maybe_add_placeholder_frames_omitted(VALUE thread, sampling_buffer* buffer, char *frames_omitted_message, int frames_omitted_message_size);
static void record_placeholder_stack_in_native_code(sampling_buffer* buffer, VALUE recorder_instance, sample_values values, sample_labels labels);
static void sample_thread_internal(
VALUE thread,
sampling_buffer* buffer,
VALUE recorder_instance,
sample_values values,
sample_labels labels
);

void collectors_stack_init(VALUE profiling_module) {
VALUE collectors_module = rb_define_module_under(profiling_module, "Collectors");
Expand Down
136 changes: 91 additions & 45 deletions ext/ddtrace_profiling_native_extension/heap_recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,24 @@ struct heap_recorder {
// Map[key: heap_record_key*, record: heap_record*]
// NOTE: We always use heap_record_key.type == HEAP_STACK for storage but support lookups
// via heap_record_key.type == LOCATION_SLICE to allow for allocation-free fast-paths.
// NOTE: This table is currently only protected by the GVL since we never iterate on it
// NOTE: This table is currently only protected by the GVL since we never interact with it
// outside the GVL.
// NOTE: This table has ownership of both its heap_record_keys and heap_records.
st_table *heap_records;

// Map[obj_id: long, record: object_record*]
// NOTE: This table is currently only protected by the GVL since we never interact with it
// outside the GVL.
// NOTE: This table has ownership of its object_records. The keys are longs and so are
// passed as values.
st_table *object_records;

// Lock protecting writes to object_records.
// NOTE: heap_records is currently not protected by this one since we do not iterate on
// heap records outside the GVL.
pthread_mutex_t records_mutex;
// Map[obj_id: long, record: object_record*]
// NOTE: This is a snapshot of object_records built ahead of a iteration. Outside of an
// iteration context, this table will be NULL. During an iteration, there will be no
// mutation of the data so iteration can occur without acquiring a lock.
// NOTE: Contrary to object_records, this table has no ownership of its data.
st_table *object_records_snapshot;

// Data for a heap recording that was started but not yet ended
partial_heap_recording active_recording;
Expand All @@ -129,6 +136,7 @@ static int st_heap_record_entry_free(st_data_t, st_data_t, st_data_t);
static int st_object_record_entry_free(st_data_t, st_data_t, st_data_t);
static int st_object_record_entry_free_if_invalid(st_data_t, st_data_t, st_data_t);
static int st_object_records_iterate(st_data_t, st_data_t, st_data_t);
static int st_object_records_debug(st_data_t key, st_data_t value, st_data_t extra);
static int update_object_record_entry(st_data_t*, st_data_t*, st_data_t, int);
static void commit_allocation(heap_recorder*, heap_record*, long, live_object_data);

Expand All @@ -144,9 +152,9 @@ static void commit_allocation(heap_recorder*, heap_record*, long, live_object_da
heap_recorder* heap_recorder_new(void) {
heap_recorder *recorder = ruby_xcalloc(1, sizeof(heap_recorder));

recorder->records_mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
recorder->heap_records = st_init_table(&st_hash_type_heap_record_key);
recorder->object_records = st_init_numtable();
recorder->object_records_snapshot = NULL;
recorder->reusable_locations = ruby_xcalloc(MAX_FRAMES_LIMIT, sizeof(ddog_prof_Location));
recorder->active_recording = (partial_heap_recording) {
.obj_id = 0, // 0 is actually the obj_id of false, but we'll never track that one in heap so we use
Expand All @@ -162,16 +170,18 @@ void heap_recorder_free(heap_recorder *heap_recorder) {
return;
}

if (heap_recorder->object_records_snapshot != NULL) {
// if there's an unfinished iteration, clean it up now
// before we clean up any other state it might depend on
heap_recorder_finish_iteration(heap_recorder);
}

st_foreach(heap_recorder->object_records, st_object_record_entry_free, 0);
st_free_table(heap_recorder->object_records);

st_foreach(heap_recorder->heap_records, st_heap_record_entry_free, 0);
st_free_table(heap_recorder->heap_records);

pthread_mutex_destroy(&heap_recorder->records_mutex);

ruby_xfree(heap_recorder->reusable_locations);

ruby_xfree(heap_recorder);
}

Expand All @@ -187,21 +197,14 @@ void heap_recorder_after_fork(heap_recorder *heap_recorder) {
// This means anything the heap recorder is tracking will still be alive after the fork and
// should thus be kept. Because this heap recorder implementation does not rely on free
// tracepoints to track liveness, any frees that happen until we fully reinitialize, will
// simply be noticed on next heap_recorder_flush.
// simply be noticed on next heap_recorder_prepare_iteration.
//
// There is one small caveat though: fork only preserves one thread and in a Ruby app, that
// will be the thread holding on to the GVL. Since we support iteration on the heap recorder
// outside of the GVL (which implies acquiring the records_mutex lock), this means the child
// process may be in this weird state of having a records_mutex lock stuck in a locked
// state and that state having been caused by a thread that no longer exists.
//
// We can't blindly unlock records_mutex from the thread calling heap_recorder_after_fork
// as unlocking mutexes a thread doesn't own is undefined behaviour. What we can do is
// create a new lock and start using it from now on-forward. This is fine because at this
// point in the fork-handling logic, all tracepoints are disabled and no-one should be
// iterating on the recorder state so there are no writers/readers that may race with
// this reinitialization.
heap_recorder->records_mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
// outside of the GVL, any state specific to that interaction may be incosistent after fork
// (e.g. an acquired lock for thread safety). Iteration operates on object_records_snapshot
// though and that one will be updated on next heap_recorder_prepare_iteration so there's
// nothing for us to do here.
}

void start_heap_allocation_recording(heap_recorder *heap_recorder, VALUE new_obj, unsigned int weight) {
Expand Down Expand Up @@ -239,32 +242,42 @@ void end_heap_allocation_recording(struct heap_recorder *heap_recorder, ddog_pro
// not end up with a still active recording. new_obj still holds the object for this recording
active_recording->obj_id = 0;

// NOTE: This is the only path where we lookup/mutate the heap_records hash. Since this
// runs under the GVL, we can afford to interact with heap_records without getting
// the lock below.
heap_record *heap_record = get_or_create_heap_record(heap_recorder, locations);

int error = pthread_mutex_trylock(&heap_recorder->records_mutex);
if (error == EBUSY) {
// We weren't able to get a lock
// TODO: Add some queuing system so we can do something other than drop this data.
cleanup_heap_record_if_unused(heap_recorder, heap_record);
// And then commit the new allocation.
commit_allocation(heap_recorder, heap_record, obj_id, active_recording->object_data);
}

void heap_recorder_prepare_iteration(heap_recorder *heap_recorder) {
if (heap_recorder == NULL) {
return;
}
if (error) ENFORCE_SUCCESS_GVL(error);

// And then commit the new allocation.
commit_allocation(heap_recorder, heap_record, obj_id, active_recording->object_data);
if (heap_recorder->object_records_snapshot != NULL) {
// we could trivially handle this but we raise to highlight and catch unexpected usages.
rb_raise(rb_eRuntimeError, "New heap recorder iteration prepared without the previous one having been finished.");
}

ENFORCE_SUCCESS_GVL(pthread_mutex_unlock(&heap_recorder->records_mutex));
st_foreach(heap_recorder->object_records, st_object_record_entry_free_if_invalid, (st_data_t) heap_recorder);

heap_recorder->object_records_snapshot = st_copy(heap_recorder->object_records);
if (heap_recorder->object_records_snapshot == NULL) {
rb_raise(rb_eRuntimeError, "Failed to create heap snapshot.");
}
}

void heap_recorder_flush(heap_recorder *heap_recorder) {
void heap_recorder_finish_iteration(heap_recorder *heap_recorder) {
if (heap_recorder == NULL) {
return;
}

st_foreach(heap_recorder->object_records, st_object_record_entry_free_if_invalid, (st_data_t) heap_recorder);
if (heap_recorder->object_records_snapshot == NULL) {
// we could trivially handle this but we raise to highlight and catch unexpected usages.
rb_raise(rb_eRuntimeError, "Heap recorder iteration finished without having been prepared.");
}

st_free_table(heap_recorder->object_records_snapshot);
heap_recorder->object_records_snapshot = NULL;
}

// Internal data we need while performing iteration over live objects.
Expand All @@ -277,23 +290,27 @@ typedef struct {
heap_recorder *heap_recorder;
} iteration_context;

// WARN: If with_gvl = False, NO HEAP ALLOCATIONS, EXCEPTIONS or RUBY CALLS ARE ALLOWED.
void heap_recorder_for_each_live_object(
// WARN: Assume iterations can run without the GVL for performance reasons. Do not raise, allocate or
// do NoGVL-unsafe interactions with the Ruby runtime. Any such interactions should be done during
// heap_recorder_prepare_iteration or heap_recorder_finish_iteration.
bool heap_recorder_for_each_live_object(
heap_recorder *heap_recorder,
bool (*for_each_callback)(heap_recorder_iteration_data stack_data, void *extra_arg),
void *for_each_callback_extra_arg,
bool with_gvl) {
void *for_each_callback_extra_arg) {
if (heap_recorder == NULL) {
return;
return true;
}

if (heap_recorder->object_records_snapshot == NULL) {
return false;
}

ENFORCE_SUCCESS_HELPER(pthread_mutex_lock(&heap_recorder->records_mutex), with_gvl);
iteration_context context;
context.for_each_callback = for_each_callback;
context.for_each_callback_extra_arg = for_each_callback_extra_arg;
context.heap_recorder = heap_recorder;
st_foreach(heap_recorder->object_records, st_object_records_iterate, (st_data_t) &context);
ENFORCE_SUCCESS_HELPER(pthread_mutex_unlock(&heap_recorder->records_mutex), with_gvl);
st_foreach(heap_recorder->object_records_snapshot, st_object_records_iterate, (st_data_t) &context);
return true;
}

void heap_recorder_testonly_assert_hash_matches(ddog_prof_Slice_Location locations) {
Expand All @@ -317,6 +334,16 @@ void heap_recorder_testonly_assert_hash_matches(ddog_prof_Slice_Location locatio
}
}

VALUE heap_recorder_testonly_debug(heap_recorder *heap_recorder) {
if (heap_recorder == NULL) {
return rb_str_new2("NULL heap_recorder");
}

VALUE debug_str = rb_str_new2("object records:\n");
st_foreach(heap_recorder->object_records, st_object_records_debug, (st_data_t) debug_str);
return debug_str;
}

// ==========================
// Heap Recorder Internal API
// ==========================
Expand Down Expand Up @@ -383,6 +410,26 @@ static int st_object_records_iterate(DDTRACE_UNUSED st_data_t key, st_data_t val
return ST_CONTINUE;
}

static int st_object_records_debug(DDTRACE_UNUSED st_data_t key, st_data_t value, st_data_t extra) {
VALUE debug_str = (VALUE) extra;

object_record *record = (object_record*) value;

heap_frame top_frame = record->heap_record->stack->frames[0];
rb_str_catf(debug_str, "obj_id=%ld weight=%d location=%s:%d ", record->obj_id, record->object_data.weight, top_frame.filename, (int) top_frame.line);

VALUE ref;
if (!ruby_ref_from_id(LONG2NUM(record->obj_id), &ref)) {
rb_str_catf(debug_str, "object=<invalid>");
} else {
rb_str_catf(debug_str, "object=%+"PRIsVALUE, ref);
}

rb_str_catf(debug_str, "\n");

return ST_CONTINUE;
}

// Struct holding data required for an update operation on heap_records
typedef struct {
// [in] The new object record we want to add.
Expand All @@ -403,7 +450,6 @@ static int update_object_record_entry(DDTRACE_UNUSED st_data_t *key, st_data_t *
return ST_CONTINUE;
}

// WARN: Expects records_mutex to be held
static void commit_allocation(heap_recorder *heap_recorder, heap_record *heap_record, long obj_id, live_object_data object_data) {
// Update object_records
object_record_update_data update_data = (object_record_update_data) {
Expand Down
33 changes: 24 additions & 9 deletions ext/ddtrace_profiling_native_extension/heap_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,25 @@ void start_heap_allocation_recording(heap_recorder *heap_recorder, VALUE new_obj
// WARN: It is illegal to call this without previously having called ::start_heap_allocation_recording.
void end_heap_allocation_recording(heap_recorder *heap_recorder, ddog_prof_Slice_Location locations);

// Flush any intermediate state that might be queued inside the heap recorder or updates certain
// state to reflect the latest state of the VM.
// Update the heap recorder to reflect the latest state of the VM and prepare internal structures
// for efficient iteration.
//
// NOTE: This should usually be called before iteration to ensure data is as little stale as possible.
void heap_recorder_flush(heap_recorder *heap_recorder);
// WARN: This must be called strictly before iteration. Failing to do so will result in exceptions.
void heap_recorder_prepare_iteration(heap_recorder *heap_recorder);

// Optimize the heap recorder by cleaning up any data that might have been prepared specifically
// for the purpose of iterating over the heap recorder data.
//
// WARN: This must be called strictly after iteration to ensure proper cleanup and to keep the memory
// profile of the heap recorder low.
void heap_recorder_finish_iteration(heap_recorder *heap_recorder);

// Iterate over each live object being tracked by the heap recorder.
//
// NOTE: Iteration can be called without holding the Ruby Global VM lock.
// WARN: This must be called strictly after heap_recorder_prepare_iteration and before
// heap_recorder_finish_iteration.
//
// @param for_each_callback
// A callback function that shall be called for each live object being tracked
// by the heap recorder. Alongside the iteration_data for each live object,
Expand All @@ -84,14 +95,18 @@ void heap_recorder_flush(heap_recorder *heap_recorder);
// @param for_each_callback_extra_arg
// Optional (NULL if empty) extra data that should be passed to the
// callback function alongside the data for each live tracked object.
// @param with_gvl
// True if we're calling this while holding the GVL, false otherwise.
void heap_recorder_for_each_live_object(
// @return true if iteration ran, false if something prevented it from running.
bool heap_recorder_for_each_live_object(
heap_recorder *heap_recorder,
bool (*for_each_callback)(heap_recorder_iteration_data data, void* extra_arg),
void *for_each_callback_extra_arg,
bool with_gvl);
void *for_each_callback_extra_arg);

// v--- TEST-ONLY APIs ---v

// Assert internal hashing logic is valid for the provided locations and its
// corresponding internal representations in heap recorder.
void heap_recorder_testonly_assert_hash_matches(ddog_prof_Slice_Location locations);

// Returns a Ruby string with a representation of internal data helpful to
// troubleshoot issues such as unexpected test failures.
VALUE heap_recorder_testonly_debug(heap_recorder *heap_recorder);
Loading

0 comments on commit d4b2b3e

Please sign in to comment.