Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bpf): use time window for bpf sampling to replace per call based sampling #1723

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 239 additions & 21 deletions bpf/kepler.bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,30 @@ SEC(".rodata.config")
__attribute__((btf_decl_tag(
"Hardware Events Enabled"))) static volatile const int HW = 1;

// The sampling rate should be disabled by default because its impact on the
// measurements is unknown.
// Global parameters for tracking periods (in milli seconds)
SEC(".rodata.config")
__attribute__((
btf_decl_tag("Sample Rate"))) static volatile const int SAMPLE_RATE = 0;
__attribute__((btf_decl_tag(
"Active Time"))) static volatile const int ACTIVE_TIME = 20;

// Global parameters for non-tracking periods (in milli seconds)
SEC(".rodata.config")
__attribute__((btf_decl_tag("Idle Time"))) static volatile const int IDLE_TIME = 80;

// BPF map to track whether we are in the tracking period or not
struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using a PERCPU_ARRAY since this would have the effect of making the time window per-cpu also, which may be desirable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to PERCPU_ARRAY

__type(key, u32);
__type(value, u32);
__uint(max_entries, 1);
} tracking_flag_map SEC(".maps");

// BPF map to store the timestamp when the tracking started
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32);
__type(value, u64);
__uint(max_entries, 1);
} start_time_map SEC(".maps");

int counter_sched_switch = 0;

Expand Down Expand Up @@ -306,7 +325,7 @@ static inline void do_page_cache_hit_increment(u32 curr_pid)
process_metrics->page_cache_hit++;
}

static inline int do_kepler_sched_switch_trace(
static inline int do_kepler_sched_switch_trace_old(
u32 prev_pid, u32 next_pid, u32 prev_tgid, u32 next_tgid)
{
u32 cpu_id;
Expand All @@ -317,24 +336,31 @@ static inline int do_kepler_sched_switch_trace(

cpu_id = bpf_get_smp_processor_id();

// Skip some samples to minimize overhead
if (SAMPLE_RATE > 0) {
if (counter_sched_switch > 0) {
// update hardware counters to be used when sample is taken
if (counter_sched_switch == 1) {
collect_metrics_and_reset_counters(
&buf, prev_pid, curr_ts, cpu_id);
// Add task on-cpu running start time
bpf_map_update_elem(
&pid_time_map, &next_pid, &curr_ts,
BPF_ANY);
// create new process metrics
register_new_process_if_not_exist(next_tgid);
}
counter_sched_switch--;
// Retrieve tracking flag and start time
u32 key = 0;
u32 *tracking_flag = bpf_map_lookup_elem(&tracking_flag_map, &key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that map lookups are the most expensive part of the eBPF code it would be better to reduce them where possible. there's no reason to store tracking_flag in a map as far as I can tell since it's value doesn't need to persist between invocations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a thinking of using kepler userspace program to set the tracking flag. The actual mechanism is not quite clear yet. Will remove this map if that is a dead end.

u64 *start_time = bpf_map_lookup_elem(&start_time_map, &key);

if (tracking_flag && start_time) {
u64 elapsed_time = (curr_ts - *start_time) / 1000000ULL;

// Update the tracking flag based on elapsed time
if (*tracking_flag && elapsed_time >= ACTIVE_TIME) {
// Stop tracking
*tracking_flag = 0;
// Reset start time
*start_time = curr_ts;
} else if (!*tracking_flag && elapsed_time >= IDLE_TIME) {
// Start tracking
*tracking_flag = 1;
// Reset start time
*start_time = curr_ts;
}

// If we are not in the tracking period, return immediately
if (!*tracking_flag) {
return 0;
}
counter_sched_switch = SAMPLE_RATE;
}

collect_metrics_and_reset_counters(&buf, prev_pid, curr_ts, cpu_id);
Expand Down Expand Up @@ -378,3 +404,195 @@ bpf_map_lookup_or_try_init(void *map, const void *key, const void *init)

return bpf_map_lookup_elem(map, key);
}

typedef struct period_metrics_t {
u64 run_time_delta;
u64 cycles_delta;
u64 instr_delta;
u64 cache_miss_delta;
u64 period_duration_ns;
} period_metrics_t;

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32);
__type(value, period_metrics_t);
__uint(max_entries, MAP_SIZE);
} min_period_metrics SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32);
__type(value, period_metrics_t);
__uint(max_entries, MAP_SIZE);
} max_period_metrics SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32);
__type(value, process_metrics_t);
__uint(max_entries, MAP_SIZE);
} last_sample SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32);
__type(value, u64);
__uint(max_entries, MAP_SIZE);
} last_interpolation_ts SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
__type(key, u32);
__type(value, u64);
__uint(max_entries, MAP_SIZE);
} period_start_ts SEC(".maps");

// retain the last sample of the process metrics
static inline void update_period_statistics(struct process_metrics_t *curr_metrics, u32 tgid, u64 curr_ts) {
struct process_metrics_t *last = bpf_map_lookup_elem(&last_sample, &tgid);
struct period_metrics_t period = {0};
struct period_metrics_t *min_val, *max_val;
u64 *period_start = bpf_map_lookup_elem(&period_start_ts, &tgid);
u32 key = 0;

if (!period_start) {
bpf_map_update_elem(&period_start_ts, &tgid, &curr_ts, BPF_ANY);
return;
}

period.period_duration_ns = curr_ts - *period_start;

if (last) {
period.run_time_delta = curr_metrics->process_run_time - last->process_run_time;
period.cycles_delta = curr_metrics->cpu_cycles - last->cpu_cycles;
period.instr_delta = curr_metrics->cpu_instr - last->cpu_instr;
period.cache_miss_delta = curr_metrics->cache_miss - last->cache_miss;
} else {
period.run_time_delta = curr_metrics->process_run_time;
period.cycles_delta = curr_metrics->cpu_cycles;
period.instr_delta = curr_metrics->cpu_instr;
period.cache_miss_delta = curr_metrics->cache_miss;
}

bpf_map_update_elem(&last_sample, &tgid, curr_metrics, BPF_ANY);
bpf_map_update_elem(&period_start_ts, &tgid, &curr_ts, BPF_ANY);

min_val = bpf_map_lookup_elem(&min_period_metrics, &tgid);
max_val = bpf_map_lookup_elem(&max_period_metrics, &tgid);

if (!min_val || !max_val) {
bpf_map_update_elem(&min_period_metrics, &tgid, &period, BPF_ANY);
bpf_map_update_elem(&max_period_metrics, &tgid, &period, BPF_ANY);
return;
}

if (period.period_duration_ns > 1000000) { // threshold with 1ms period
if (period.run_time_delta < min_val->run_time_delta)
min_val->run_time_delta = period.run_time_delta;
if (period.run_time_delta > max_val->run_time_delta)
max_val->run_time_delta = period.run_time_delta;

if (period.cycles_delta < min_val->cycles_delta)
min_val->cycles_delta = period.cycles_delta;
if (period.cycles_delta > max_val->cycles_delta)
max_val->cycles_delta = period.cycles_delta;

if (period.instr_delta < min_val->instr_delta)
min_val->instr_delta = period.instr_delta;
if (period.instr_delta > max_val->instr_delta)
max_val->instr_delta = period.instr_delta;

if (period.cache_miss_delta < min_val->cache_miss_delta)
min_val->cache_miss_delta = period.cache_miss_delta;
if (period.cache_miss_delta > max_val->cache_miss_delta)
max_val->cache_miss_delta = period.cache_miss_delta;
}
}

// Interpolate the metrics during idle
static inline void interpolate_idle_metrics(u32 tgid, u64 curr_ts) {
struct process_metrics_t *curr_metrics = bpf_map_lookup_elem(&processes, &tgid);
struct period_metrics_t *min_val = bpf_map_lookup_elem(&min_period_metrics, &tgid);
struct period_metrics_t *max_val = bpf_map_lookup_elem(&max_period_metrics, &tgid);
u64 *last_ts = bpf_map_lookup_elem(&last_interpolation_ts, &tgid);

if (!curr_metrics || !min_val || !max_val || !last_ts)
return;

u64 time_since_last = curr_ts - *last_ts;
if (time_since_last < (IDLE_TIME * 1000000ULL))
return;

u64 avg_period_duration = (min_val->period_duration_ns + max_val->period_duration_ns) / 2;
if (avg_period_duration == 0)
return;

u64 missed_periods = time_since_last / avg_period_duration;
if (missed_periods == 0)
return;

u64 avg_runtime_delta = (min_val->run_time_delta + max_val->run_time_delta) / 2;
u64 avg_cycles_delta = (min_val->cycles_delta + max_val->cycles_delta) / 2;
u64 avg_instr_delta = (min_val->instr_delta + max_val->instr_delta) / 2;
u64 avg_cache_miss_delta = (min_val->cache_miss_delta + max_val->cache_miss_delta) / 2;

curr_metrics->process_run_time += (avg_runtime_delta * missed_periods);
curr_metrics->cpu_cycles += (avg_cycles_delta * missed_periods);
curr_metrics->cpu_instr += (avg_instr_delta * missed_periods);
curr_metrics->cache_miss += (avg_cache_miss_delta * missed_periods);

*last_ts = curr_ts;
}

static inline int do_kepler_sched_switch_trace(
u32 prev_pid, u32 next_pid, u32 prev_tgid, u32 next_tgid)
{
u32 cpu_id;
u64 curr_ts = bpf_ktime_get_ns();
struct process_metrics_t *curr_tgid_metrics, *prev_tgid_metrics;
struct process_metrics_t buf = {};

cpu_id = bpf_get_smp_processor_id();

u32 key = 0;
u32 *tracking_flag = bpf_map_lookup_elem(&tracking_flag_map, &key);
u64 *start_time = bpf_map_lookup_elem(&start_time_map, &key);

if (tracking_flag && start_time) {
u64 elapsed_time = (curr_ts - *start_time) / 1000000ULL;

if (*tracking_flag && elapsed_time >= ACTIVE_TIME) {
*tracking_flag = 0;
*start_time = curr_ts;
bpf_map_update_elem(&last_interpolation_ts, &prev_tgid, &curr_ts, BPF_ANY);
} else if (!*tracking_flag && elapsed_time >= IDLE_TIME) {
*tracking_flag = 1;
*start_time = curr_ts;
}

if (!*tracking_flag) {
interpolate_idle_metrics(prev_tgid, curr_ts);
return 0;
}
}

collect_metrics_and_reset_counters(&buf, prev_pid, curr_ts, cpu_id);

if (buf.process_run_time > 0) {
prev_tgid_metrics = bpf_map_lookup_elem(&processes, &prev_tgid);
if (prev_tgid_metrics) {
prev_tgid_metrics->process_run_time += buf.process_run_time;
prev_tgid_metrics->cpu_cycles += buf.cpu_cycles;
prev_tgid_metrics->cpu_instr += buf.cpu_instr;
prev_tgid_metrics->cache_miss += buf.cache_miss;

update_period_statistics(prev_tgid_metrics, prev_tgid, curr_ts);
}
}

bpf_map_update_elem(&pid_time_map, &next_pid, &curr_ts, BPF_ANY);
register_new_process_if_not_exist(prev_tgid);

return 0;
}
8 changes: 7 additions & 1 deletion pkg/bpf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ func (e *exporter) attach() error {

// Set program global variables
err = specs.RewriteConstants(map[string]interface{}{
"SAMPLE_RATE": int32(config.GetBPFSampleRate()),
"ACTIVE_TIME": int32(config.GetBPFActiveSampleWindowMS()),
})
if err != nil {
return fmt.Errorf("error rewriting program constants: %v", err)
}
err = specs.RewriteConstants(map[string]interface{}{
"IDLE_TIME": int32(config.GetBPFIdleSampleWindowMS()),
})
if err != nil {
return fmt.Errorf("error rewriting program constants: %v", err)
Expand Down
29 changes: 29 additions & 0 deletions pkg/bpf/kepler_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/bpf/kepler_bpfeb.o
Binary file not shown.
Loading