Skip to content

Commit

Permalink
server: concurrency fix + monitoring - add /metrics prometheus compat…
Browse files Browse the repository at this point in the history
…ible endpoint (#5708)

* server: monitoring - add /metrics prometheus compatible endpoint

* server: concurrency issue, when 2 task are waiting for results, only one call thread is notified

* server: metrics - move to a dedicated struct
  • Loading branch information
phymbert authored Feb 25, 2024
1 parent 1289408 commit d52d781
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 8 deletions.
13 changes: 13 additions & 0 deletions examples/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ see https://github.com/ggerganov/llama.cpp/issues/1437
- `--grp-attn-w`: Set the group attention width to extend context size through self-extend(default: 512), used together with group attention factor `--grp-attn-n`
- `-n, --n-predict`: Set the maximum tokens to predict (default: -1)
- `--slots-endpoint-disable`: To disable slots state monitoring endpoint. Slots state may contain user data, prompts included.
- `--metrics`: enable prometheus `/metrics` compatible endpoint (default: disabled)
- `--chat-template JINJA_TEMPLATE`: Set custom jinja chat template. This parameter accepts a string, not a file name (default: template taken from model's metadata). We only support [some pre-defined templates](https://github.com/ggerganov/llama.cpp/wiki/Templates-supported-by-llama_chat_apply_template)

## Build
Expand Down Expand Up @@ -457,6 +458,18 @@ Notice that each `probs` is an array of length `n_probs`.
]
```

- **GET** `/metrics`: [Prometheus](https://prometheus.io/) compatible metrics exporter endpoint if `--metrics` is enabled:

Available metrics:
- `llamacpp:prompt_tokens_total`: Number of prompt tokens processed.
- `llamacpp:tokens_predicted_total`: Number of generation tokens processed.
- `llamacpp:prompt_tokens_seconds`: Average prompt throughput in tokens/s.
- `llamacpp:predicted_tokens_seconds`: Average generation throughput in tokens/s.
- `llamacpp:kv_cache_usage_ratio`: KV-cache usage. 1 means 100 percent usage.
- `llamacpp:kv_cache_tokens`: KV-cache tokens.
- `llamacpp:requests_processing`: Number of request processing.
- `llamacpp:requests_deferred`: Number of request deferred.

## More examples

### Change system prompt on runtime
Expand Down
150 changes: 144 additions & 6 deletions examples/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct server_params
int32_t read_timeout = 600;
int32_t write_timeout = 600;
bool slots_endpoint = true;
bool metrics_endpoint = false;
};

bool server_verbose = false;
Expand Down Expand Up @@ -310,6 +311,39 @@ struct llama_client_slot
}
};

struct llama_metrics {
uint64_t n_prompt_tokens_processed_total = 0;
uint64_t n_tokens_predicted_total = 0;

uint64_t n_prompt_tokens_processed = 0;
uint64_t t_prompt_processing = 0;

uint64_t n_tokens_predicted = 0;
uint64_t t_tokens_generation = 0;


void on_prompt_eval(const llama_client_slot &slot) {
n_prompt_tokens_processed_total += slot.num_prompt_tokens_processed;

n_prompt_tokens_processed += slot.num_prompt_tokens_processed;
t_prompt_processing += slot.t_prompt_processing;
}

void on_prediction(const llama_client_slot &slot) {
n_tokens_predicted_total += slot.n_decoded;

n_tokens_predicted += slot.n_decoded;
t_tokens_generation += slot.t_token_generation;
}

void reset_bucket() {
n_prompt_tokens_processed = 0;
t_prompt_processing = 0;
n_tokens_predicted = 0;
t_tokens_generation = 0;
}
};

struct llama_server_context
{
llama_model *model = nullptr;
Expand Down Expand Up @@ -344,6 +378,8 @@ struct llama_server_context
llama_server_queue queue_tasks;
llama_server_response queue_results;

llama_metrics metrics;

~llama_server_context()
{
if (ctx)
Expand Down Expand Up @@ -1404,7 +1440,7 @@ struct llama_server_context
case TASK_TYPE_NEXT_RESPONSE: {
// do nothing
} break;
case TASK_TYPE_SLOTS_DATA: {
case TASK_TYPE_METRICS: {
json slots_data = json::array();
int n_idle_slots = 0;
int n_processing_slots = 0;
Expand Down Expand Up @@ -1438,10 +1474,24 @@ struct llama_server_context
res.stop = true;
res.error = false;
res.result_json = {
{ "idle", n_idle_slots },
{ "processing", n_processing_slots },
{ "slots", slots_data }
{ "idle", n_idle_slots },
{ "processing", n_processing_slots },
{ "deferred", queue_tasks.queue_tasks_deferred.size() },

{ "n_prompt_tokens_processed_total", metrics.n_prompt_tokens_processed_total},
{ "n_tokens_predicted_total", metrics.n_tokens_predicted_total},

{ "n_prompt_tokens_processed", metrics.n_prompt_tokens_processed},
{ "t_prompt_processing", metrics.t_prompt_processing},
{ "n_tokens_predicted", metrics.n_tokens_predicted},
{ "t_tokens_generation", metrics.t_tokens_generation},

{ "kv_cache_tokens_count", llama_get_kv_cache_token_count(ctx)},
{ "kv_cache_used_cells", llama_get_kv_cache_used_cells(ctx)},

{ "slots", slots_data },
};
metrics.reset_bucket();
queue_results.send(res);
} break;
}
Expand Down Expand Up @@ -1849,6 +1899,7 @@ struct llama_server_context
{
slot.t_start_genereration = ggml_time_us();
slot.t_prompt_processing = (slot.t_start_genereration - slot.t_start_process_prompt) / 1e3;
metrics.on_prompt_eval(slot);
}

llama_token_data_array cur_p = { slot.ctx_sampling->cur.data(), slot.ctx_sampling->cur.size(), false };
Expand All @@ -1871,6 +1922,7 @@ struct llama_server_context
slot.release();
slot.print_timings();
send_final_response(slot);
metrics.on_prediction(slot);
}

slot.i_batch = -1;
Expand Down Expand Up @@ -1955,6 +2007,7 @@ static void server_print_usage(const char *argv0, const gpt_params &params,
printf(" --mmproj MMPROJ_FILE path to a multimodal projector file for LLaVA.\n");
printf(" --log-disable disables logging to a file.\n");
printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n");
printf(" --metrics enable prometheus compatible metrics endpoint (default: %s).\n", sparams.metrics_endpoint ? "enabled" : "disabled");
printf("\n");
printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict);
printf(" --override-kv KEY=TYPE:VALUE\n");
Expand Down Expand Up @@ -2414,6 +2467,10 @@ static void server_params_parse(int argc, char **argv, server_params &sparams,
{
sparams.slots_endpoint = false;
}
else if (arg == "--metrics")
{
sparams.metrics_endpoint = true;
}
else if (arg == "--chat-template")
{
if (++i >= argc)
Expand Down Expand Up @@ -2621,7 +2678,7 @@ int main(int argc, char **argv)
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA;
task.type = TASK_TYPE_METRICS;
task.target_id = -1;

llama.queue_results.add_waiting_task_id(task.id);
Expand Down Expand Up @@ -2668,7 +2725,7 @@ int main(int argc, char **argv)
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA;
task.type = TASK_TYPE_METRICS;
task.target_id = -1;

llama.queue_results.add_waiting_task_id(task.id);
Expand All @@ -2683,6 +2740,87 @@ int main(int argc, char **argv)
});
}

if (sparams.metrics_endpoint) {
svr.Get("/metrics", [&](const httplib::Request&, httplib::Response& res) {
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_METRICS;
task.target_id = -1;

llama.queue_results.add_waiting_task_id(task.id);
llama.queue_tasks.post(task);

// get the result
task_result result = llama.queue_results.recv(task.id);
llama.queue_results.remove_waiting_task_id(task.id);

json data = result.result_json;

uint64_t n_prompt_tokens_processed = data["n_prompt_tokens_processed"];
uint64_t t_prompt_processing = data["t_prompt_processing"];

uint64_t n_tokens_predicted = data["n_tokens_predicted"];
uint64_t t_tokens_generation = data["t_tokens_generation"];

int32_t kv_cache_used_cells = data["kv_cache_used_cells"];

// metrics definition: https://prometheus.io/docs/practices/naming/#metric-names
json all_metrics_def = json {
{"counter", {{
{"name", "prompt_tokens_total"},
{"help", "Number of prompt tokens processed."},
{"value", data["n_prompt_tokens_processed_total"]}
}, {
{"name", "tokens_predicted_total"},
{"help", "Number of generation tokens processed."},
{"value", data["n_tokens_predicted_total"]}
}}},
{"gauge", {{
{"name", "prompt_tokens_seconds"},
{"help", "Average prompt throughput in tokens/s."},
{"value", n_prompt_tokens_processed ? 1e3 / t_prompt_processing * n_prompt_tokens_processed : 0}
},{
{"name", "predicted_tokens_seconds"},
{"help", "Average generation throughput in tokens/s."},
{"value", n_tokens_predicted ? 1e3 / t_tokens_generation * n_tokens_predicted : 0}
},{
{"name", "kv_cache_usage_ratio"},
{"help", "KV-cache usage. 1 means 100 percent usage."},
{"value", 1. * kv_cache_used_cells / params.n_ctx}
},{
{"name", "kv_cache_tokens"},
{"help", "KV-cache tokens."},
{"value", data["kv_cache_tokens_count"]}
},{
{"name", "requests_processing"},
{"help", "Number of request processing."},
{"value", data["processing"]}
},{
{"name", "requests_deferred"},
{"help", "Number of request deferred."},
{"value", data["deferred"]}
}}}
};

std::stringstream prometheus;
for (const auto& el : all_metrics_def.items()) {
const auto& type = el.key();
const auto& metrics_def = el.value();
for (const auto& metric_def : metrics_def) {
std::string name = metric_def["name"];
std::string help = metric_def["help"];
prometheus << "# HELP llamacpp:" << name << " " << help << "\n"
<< "# TYPE llamacpp:" << name << " " << type << "\n"
<< "llamacpp:" << name << " " << metric_def["value"] << "\n";
}
}

res.set_content(prometheus.str(), "text/plain; version=0.0.4");
res.status = 200; // HTTP OK
});
}

svr.set_logger(log_server_request);

svr.set_exception_handler([](const httplib::Request &, httplib::Response &res, std::exception_ptr ep)
Expand Down
2 changes: 2 additions & 0 deletions examples/server/tests/features/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def before_scenario(context, scenario):


def after_scenario(context, scenario):
if context.server_process is None:
return
if scenario.status == "failed":
if 'GITHUB_ACTIONS' in os.environ:
print(f"\x1b[33;101mSCENARIO FAILED: {scenario.name} server logs:\x1b[0m\n\n")
Expand Down
2 changes: 2 additions & 0 deletions examples/server/tests/features/server.feature
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Feature: llama.cpp server
And 1 slots
And embeddings extraction
And 32 server max tokens to predict
And prometheus compatible metrics exposed
Then the server is starting
Then the server is healthy

Expand All @@ -25,6 +26,7 @@ Feature: llama.cpp server
And <n_predict> max tokens to predict
And a completion request with no api error
Then <n_predicted> tokens are predicted matching <re_content>
And prometheus metrics are exposed

Examples: Prompts
| prompt | n_predict | re_content | n_predicted |
Expand Down
27 changes: 27 additions & 0 deletions examples/server/tests/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import openai
from behave import step
from behave.api.async_step import async_run_until_complete
from prometheus_client import parser


@step(u"a server listening on {server_fqdn}:{server_port}")
Expand All @@ -34,6 +35,8 @@ def step_server_config(context, server_fqdn, server_port):
context.server_api_key = None
context.server_continuous_batching = False
context.server_embeddings = False
context.server_metrics = False
context.server_process = None
context.server_seed = None
context.user_api_key = None

Expand Down Expand Up @@ -82,6 +85,11 @@ def step_server_embeddings(context):
context.server_embeddings = True


@step(u'prometheus compatible metrics exposed')
def step_server_metrics(context):
context.server_metrics = True


@step(u"the server is starting")
def step_start_server(context):
start_server_background(context)
Expand Down Expand Up @@ -424,6 +432,23 @@ def step_check_options_header_value(context, cors_header, cors_header_value):
assert context.options_response.headers[cors_header] == cors_header_value


@step(u'prometheus metrics are exposed')
@async_run_until_complete
async def step_prometheus_metrics_exported(context):
async with aiohttp.ClientSession() as session:
async with await session.get(f'{context.base_url}/metrics') as metrics_response:
assert metrics_response.status == 200
assert metrics_response.headers['Content-Type'] == "text/plain; version=0.0.4"
metrics_raw = await metrics_response.text()
metric_exported = False
for metric in parser.text_string_to_metric_families(metrics_raw):
match metric.name:
case "llamacpp:kv_cache_usage_ratio":
assert len(metric.samples) > 0
metric_exported = True
assert metric_exported, "No metrics exported"


async def concurrent_requests(context, f_completion, *args, **kwargs):
n_prompts = len(context.prompts)
if context.debug:
Expand Down Expand Up @@ -753,6 +778,8 @@ def start_server_background(context):
server_args.append('--cont-batching')
if context.server_embeddings:
server_args.append('--embedding')
if context.server_metrics:
server_args.append('--metrics')
if context.model_alias is not None:
server_args.extend(['--alias', context.model_alias])
if context.n_ctx is not None:
Expand Down
1 change: 1 addition & 0 deletions examples/server/tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aiohttp~=3.9.3
behave~=1.2.6
openai~=0.25.0
prometheus-client~=0.20.0
4 changes: 2 additions & 2 deletions examples/server/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ enum task_type {
TASK_TYPE_COMPLETION,
TASK_TYPE_CANCEL,
TASK_TYPE_NEXT_RESPONSE,
TASK_TYPE_SLOTS_DATA
TASK_TYPE_METRICS
};

struct task_server {
Expand Down Expand Up @@ -441,7 +441,7 @@ struct llama_server_response {
{
LOG_VERBOSE("queue_results.push_back", {});
queue_results.push_back(result);
condition_results.notify_one();
condition_results.notify_all();
return;
}
}
Expand Down

0 comments on commit d52d781

Please sign in to comment.