Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline parallelism demo #4918

Closed
wants to merge 13 commits into from
9 changes: 9 additions & 0 deletions common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,12 @@ bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) {
break;
}
params.n_batch = std::stoi(argv[i]);
} else if (arg == "-ub" || arg == "--ubatch-size") {
if (++i >= argc) {
invalid_param = true;
break;
}
params.n_ubatch = std::stoi(argv[i]);
} else if (arg == "--keep") {
if (++i >= argc) {
invalid_param = true;
Expand Down Expand Up @@ -926,6 +932,8 @@ void gpt_print_usage(int /*argc*/, char ** argv, const gpt_params & params) {
printf(" -n N, --n-predict N number of tokens to predict (default: %d, -1 = infinity, -2 = until context filled)\n", params.n_predict);
printf(" -c N, --ctx-size N size of the prompt context (default: %d, 0 = loaded from model)\n", params.n_ctx);
printf(" -b N, --batch-size N batch size for prompt processing (default: %d)\n", params.n_batch);
printf(" -ub N, --ubatch-size N\n");
printf(" micro batch size for prompt processing (default: %d)\n", params.n_ubatch);
printf(" --samplers samplers that will be used for generation in the order, separated by \';\', for example: \"top_k;tfs;typical;top_p;min_p;temp\"\n");
printf(" --sampling-seq simplified sequence for samplers that will be used (default: %s)\n", sparams.samplers_sequence.c_str());
printf(" --top-k N top-k sampling (default: %d, 0 = disabled)\n", sparams.top_k);
Expand Down Expand Up @@ -1171,6 +1179,7 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param

cparams.n_ctx = params.n_ctx;
cparams.n_batch = params.n_batch;
cparams.n_ubatch = params.n_ubatch;
cparams.n_threads = params.n_threads;
cparams.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch;
cparams.mul_mat_q = params.mul_mat_q;
Expand Down
3 changes: 2 additions & 1 deletion common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ struct gpt_params {
int32_t n_threads_batch_draft = -1;
int32_t n_predict = -1; // new tokens to predict
int32_t n_ctx = 512; // context size
int32_t n_batch = 512; // batch size for prompt processing (must be >=32 to use BLAS)
int32_t n_batch = 4096; // batch size for prompt processing (must be >=32 to use BLAS)
int32_t n_ubatch = 256; // batch size for prompt processing (must be >=32 to use BLAS)
int32_t n_keep = 0; // number of tokens to keep from initial prompt
int32_t n_draft = 8; // number of tokens to draft during speculative decoding
int32_t n_chunks = -1; // max number of chunks to process (-1 = unlimited)
Expand Down
19 changes: 14 additions & 5 deletions examples/llama-bench/llama-bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ static void print_usage(int /* argc */, char ** argv) {
printf(" -mg, --main-gpu <i> (default: %s)\n", join(cmd_params_defaults.main_gpu, ",").c_str());
printf(" -nkvo, --no-kv-offload <0|1> (default: %s)\n", join(cmd_params_defaults.no_kv_offload, ",").c_str());
printf(" -mmq, --mul-mat-q <0|1> (default: %s)\n", join(cmd_params_defaults.mul_mat_q, ",").c_str());
printf(" -ts, --tensor_split <ts0/ts1/..> (default: 0)\n");
printf(" -ts, --tensor-split <ts0/ts1/..> (default: 0)\n");
printf(" -r, --repetitions <n> (default: %d)\n", cmd_params_defaults.reps);
printf(" -o, --output <csv|json|md|sql> (default: %s)\n", output_format_str(cmd_params_defaults.output_format));
printf(" -v, --verbose (default: %s)\n", cmd_params_defaults.verbose ? "1" : "0");
Expand Down Expand Up @@ -1041,16 +1041,22 @@ struct sql_printer : public printer {
};

static void test_prompt(llama_context * ctx, int n_prompt, int n_past, int n_batch, int n_threads) {
llama_set_n_threads(ctx, n_threads, n_threads);

std::vector<llama_token> tokens(n_prompt, llama_token_bos(llama_get_model(ctx)));
llama_decode(ctx, llama_batch_get_one(tokens.data(), n_prompt, n_past, 0));

GGML_UNUSED(n_batch);

/*
std::vector<llama_token> tokens(n_batch, llama_token_bos(llama_get_model(ctx)));
int n_processed = 0;

llama_set_n_threads(ctx, n_threads, n_threads);

while (n_processed < n_prompt) {
int n_tokens = std::min(n_prompt - n_processed, n_batch);
llama_decode(ctx, llama_batch_get_one(tokens.data(), n_tokens, n_past + n_processed, 0));
n_processed += n_tokens;
}
*/
}

static void test_gen(llama_context * ctx, int n_gen, int n_past, int n_threads) {
Expand Down Expand Up @@ -1149,11 +1155,12 @@ int main(int argc, char ** argv) {

// warmup run
if (t.n_prompt > 0) {
test_prompt(ctx, std::min(2, t.n_batch), 0, t.n_batch, t.n_threads);
test_prompt(ctx, std::min(t.n_batch, std::min(t.n_prompt, 32)), 0, t.n_batch, t.n_threads);
}
if (t.n_gen > 0) {
test_gen(ctx, 1, 0, t.n_threads);
}
llama_get_logits(ctx); // force sync

for (int i = 0; i < params.reps; i++) {
llama_kv_cache_clear(ctx);
Expand All @@ -1165,6 +1172,8 @@ int main(int argc, char ** argv) {
if (t.n_gen > 0) {
test_gen(ctx, t.n_gen, t.n_prompt, t.n_threads);
}
llama_get_logits(ctx); // force sync

uint64_t t_ns = get_time_ns() - t_start;
t.samples_ns.push_back(t_ns);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/perplexity/perplexity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1782,13 +1782,13 @@ static void kl_divergence(llama_context * ctx, const gpt_params & params) {
int main(int argc, char ** argv) {
gpt_params params;

params.n_batch = 512;
//params.n_batch = 512;
if (!gpt_params_parse(argc, argv, params)) {
return 1;
}

params.logits_all = true;
params.n_batch = std::min(params.n_batch, params.n_ctx);
//params.n_batch = std::min(params.n_batch, params.n_ctx);

if (params.ppl_stride > 0) {
fprintf(stderr, "Will perform strided perplexity calculation -> adjusting context size from %d to %d\n",
Expand Down
12 changes: 12 additions & 0 deletions ggml-alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,18 @@ struct ggml_backend_buffer * ggml_tallocr_get_buffer(ggml_tallocr_t alloc) {
return alloc->buffer;
}

void ggml_tallocr_set_buffer(ggml_tallocr_t talloc, struct ggml_backend_buffer * buffer) {
GGML_ASSERT(talloc->measure == false);
// FIXME: buffer ownership semantics
// if the user is doing this, they probably want to take ownership of the buffer
// or they need to restore the original buffer before freeing the allocator
//talloc->buffer_owned = false;
talloc->buffer = buffer;
talloc->base = ggml_backend_buffer_get_base(buffer);
talloc->alignment = ggml_backend_buffer_get_alignment(buffer);
ggml_tallocr_reset(talloc);
}

void ggml_tallocr_free(ggml_tallocr_t alloc) {
if (alloc == NULL) {
return;
Expand Down
1 change: 1 addition & 0 deletions ggml-alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ GGML_API ggml_tallocr_t ggml_tallocr_new_measure_from_buft(struct ggml_backend_b
GGML_API ggml_tallocr_t ggml_tallocr_new_measure_from_backend(struct ggml_backend * backend);

GGML_API struct ggml_backend_buffer * ggml_tallocr_get_buffer(ggml_tallocr_t talloc);
GGML_API void ggml_tallocr_set_buffer(ggml_tallocr_t talloc, struct ggml_backend_buffer * buffer);

GGML_API void ggml_tallocr_free (ggml_tallocr_t talloc);
GGML_API bool ggml_tallocr_is_measure (ggml_tallocr_t talloc);
Expand Down
21 changes: 16 additions & 5 deletions ggml-backend-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ extern "C" {
ggml_backend_buffer_type_t (*GGML_CALL get_default_buffer_type)(ggml_backend_t backend);

// (optional) asynchronous tensor data access
void (*GGML_CALL set_tensor_async)(ggml_backend_t backend, struct ggml_tensor * tensor, const void * data, size_t offset, size_t size);
void (*GGML_CALL get_tensor_async)(ggml_backend_t backend, const struct ggml_tensor * tensor, void * data, size_t offset, size_t size);
bool (*GGML_CALL cpy_tensor_async)(ggml_backend_t backend, const struct ggml_tensor * src, struct ggml_tensor * dst);
void (*set_tensor_async)(ggml_backend_t backend, struct ggml_tensor * tensor, const void * data, size_t offset, size_t size);
void (*get_tensor_async)(ggml_backend_t backend, const struct ggml_tensor * tensor, void * data, size_t offset, size_t size);
bool (*cpy_tensor_async)(ggml_backend_t backend_src, ggml_backend_t backend_dst, const struct ggml_tensor * src, struct ggml_tensor * dst);

// (optional) complete all pending operations
void (*GGML_CALL synchronize)(ggml_backend_t backend);

// compute graph with a plan
// compute graph with a plan (not used currently)
ggml_backend_graph_plan_t (*GGML_CALL graph_plan_create) (ggml_backend_t backend, const struct ggml_cgraph * cgraph);
void (*GGML_CALL graph_plan_free) (ggml_backend_t backend, ggml_backend_graph_plan_t plan);
void (*GGML_CALL graph_plan_compute)(ggml_backend_t backend, ggml_backend_graph_plan_t plan);
Expand All @@ -95,14 +95,25 @@ extern "C" {

// check if the backend supports an operation
bool (*GGML_CALL supports_op)(ggml_backend_t backend, const struct ggml_tensor * op);

// (optional) event synchronization
ggml_backend_event_t (*GGML_CALL event_new) (ggml_backend_t backend);
void (*GGML_CALL event_free) (ggml_backend_event_t event);
void (*GGML_CALL event_record) (ggml_backend_event_t event);
void (*GGML_CALL event_wait) (ggml_backend_t backend, ggml_backend_event_t event);
void (*GGML_CALL event_synchronize) (ggml_backend_event_t event);
};

struct ggml_backend {
struct ggml_backend_i iface;

ggml_backend_context_t context;
};

struct ggml_backend_event {
ggml_backend_t backend;
void * context;
};

//
// Backend registry
//
Expand Down
60 changes: 47 additions & 13 deletions ggml-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,21 @@ void ggml_backend_tensor_get_async(ggml_backend_t backend, const struct ggml_ten
GGML_CALL void ggml_backend_tensor_set(struct ggml_tensor * tensor, const void * data, size_t offset, size_t size) {
ggml_backend_buffer_t buf = tensor->view_src ? tensor->view_src->buffer : tensor->buffer;

GGML_ASSERT(tensor->data != NULL && "tensor not allocated");
GGML_ASSERT(buf != NULL && "tensor buffer not set");
GGML_ASSERT(tensor->data != NULL && "tensor not allocated");
GGML_ASSERT(offset + size <= ggml_nbytes(tensor) && "tensor write out of bounds");

tensor->buffer->iface.set_tensor(buf, tensor, data, offset, size);
buf->iface.set_tensor(buf, tensor, data, offset, size);
}

GGML_CALL void ggml_backend_tensor_get(const struct ggml_tensor * tensor, void * data, size_t offset, size_t size) {
ggml_backend_buffer_t buf = tensor->view_src ? tensor->view_src->buffer : tensor->buffer;

GGML_ASSERT(buf != NULL && "tensor buffer not set");
GGML_ASSERT(tensor->data != NULL && "tensor not allocated");
GGML_ASSERT(tensor->buffer != NULL && "tensor buffer not set");
GGML_ASSERT(offset + size <= ggml_nbytes(tensor) && "tensor read out of bounds");

tensor->buffer->iface.get_tensor(buf, tensor, data, offset, size);
buf->iface.get_tensor(buf, tensor, data, offset, size);
}

void ggml_backend_synchronize(ggml_backend_t backend) {
Expand Down Expand Up @@ -279,30 +279,52 @@ void ggml_backend_tensor_copy(struct ggml_tensor * src, struct ggml_tensor * dst
}
}

void ggml_backend_tensor_copy_async(ggml_backend_t backend, struct ggml_tensor * src, struct ggml_tensor * dst) {
void ggml_backend_tensor_copy_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, struct ggml_tensor * src, struct ggml_tensor * dst) {
GGML_ASSERT(ggml_are_same_layout(src, dst) && "cannot copy tensors with different layouts");

if (src == dst) {
return;
}

if (ggml_backend_buft_supports_backend(src->buffer->buft, backend) && ggml_backend_buft_supports_backend(dst->buffer->buft, backend)) {
if (backend->iface.cpy_tensor_async != NULL) {
if (backend->iface.cpy_tensor_async(backend, src, dst)) {
return;
}
if (backend_dst->iface.cpy_tensor_async != NULL) {
if (backend_dst->iface.cpy_tensor_async(backend_src, backend_dst, src, dst)) {
return;
}
}

size_t nbytes = ggml_nbytes(src);
if (ggml_backend_buffer_is_host(src->buffer)) {
ggml_backend_tensor_set_async(backend, dst, src->data, 0, nbytes);
// wait for src to be ready before copy
ggml_backend_synchronize(backend_src);
ggml_backend_tensor_set_async(backend_dst, dst, src->data, 0, nbytes);
}
else {
ggml_backend_tensor_copy(src, dst);
}
}

// events

ggml_backend_event_t ggml_backend_event_new(ggml_backend_t backend) {
return backend->iface.event_new(backend);
}

void ggml_backend_event_free(ggml_backend_event_t event) {
event->backend->iface.event_free(event);
free(event);
}

void ggml_backend_event_record(ggml_backend_event_t event) {
event->backend->iface.event_record(event);
}

void ggml_backend_event_synchronize(ggml_backend_event_t event) {
event->backend->iface.event_synchronize(event);
}

void ggml_backend_event_wait(ggml_backend_t backend, ggml_backend_event_t event) {
backend->iface.event_wait(backend, event);
}

// backend registry

Expand Down Expand Up @@ -716,6 +738,11 @@ static struct ggml_backend_i cpu_backend_i = {
/* .graph_plan_compute = */ ggml_backend_cpu_graph_plan_compute,
/* .graph_compute = */ ggml_backend_cpu_graph_compute,
/* .supports_op = */ ggml_backend_cpu_supports_op,
/* .event_new = */ NULL,
/* .event_free = */ NULL,
/* .event_record = */ NULL,
/* .event_wait = */ NULL,
/* .event_synchronize = */ NULL,
};

ggml_backend_t ggml_backend_cpu_init(void) {
Expand Down Expand Up @@ -853,6 +880,8 @@ static ggml_tallocr_t sched_allocr_from_buffer(ggml_backend_sched_t sched, ggml_
return sched->tallocs[i];
}
}

fprintf(stderr, "%s: error: no backend supports buffer type %s\n", __func__, ggml_backend_buffer_name(buffer));
GGML_ASSERT(false && "tensor buffer type not supported by any backend");
}

Expand Down Expand Up @@ -1315,6 +1344,7 @@ static void sched_compute_splits(ggml_backend_sched_t sched) {
// copy the input tensors to the split backend
uint64_t copy_start_us = ggml_time_us();
for (int j = 0; j < split->n_inputs; j++) {
ggml_backend_t input_backend = get_allocr_backend(sched, node_allocr(split->inputs[j]));
struct ggml_tensor * input = split->inputs[j];
struct ggml_tensor * input_cpy = sched->node_copies[hash_id(input)][split_backend_id];

Expand All @@ -1323,7 +1353,7 @@ static void sched_compute_splits(ggml_backend_sched_t sched) {

// TODO: avoid this copy if it was already copied in a previous split, and the input didn't change
// this is important to avoid copying constants such as KQ_mask and inp_pos multiple times
ggml_backend_tensor_copy_async(split_backend, input, input_cpy);
ggml_backend_tensor_copy_async(input_backend, split_backend, input, input_cpy);
}
//ggml_backend_synchronize(split_backend); // necessary to measure copy time
int64_t copy_end_us = ggml_time_us();
Expand All @@ -1335,7 +1365,6 @@ static void sched_compute_splits(ggml_backend_sched_t sched) {
ggml_graph_dump_dot(split->graph, NULL, split_filename);
#endif


uint64_t compute_start_us = ggml_time_us();
if (!sched->callback_eval) {
ggml_backend_graph_compute(split_backend, &split->graph);
Expand Down Expand Up @@ -1471,6 +1500,11 @@ void ggml_backend_sched_reset(ggml_backend_sched_t sched) {
sched_reset(sched);
}

void ggml_backend_sched_synchronize(ggml_backend_sched_t sched) {
for (int i = 0; i < sched->n_backends; i++) {
ggml_backend_synchronize(sched->backends[i]);
}
}

void ggml_backend_sched_set_eval_callback(ggml_backend_sched_t sched, ggml_backend_sched_eval_callback callback, void * user_data) {
sched->callback_eval = callback;
Expand Down
Loading
Loading