From 36cb20ab4b86e380748b765f3c4d311e1c326370 Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Thu, 29 Aug 2024 23:17:33 +0400 Subject: [PATCH 1/4] Infra --- samples/CMakeLists.txt | 1 + .../CMakeLists.txt | 25 ++++ ...ntinuous_batching_speculative_decoding.cpp | 140 ++++++++++++++++++ src/cpp/src/logit_processor.hpp | 14 ++ src/cpp/src/sampler.hpp | 131 ++++++++++++---- src/cpp/src/scheduler.hpp | 7 +- src/cpp/src/sequence_group.hpp | 57 ++++++- 7 files changed, 337 insertions(+), 38 deletions(-) create mode 100644 samples/cpp/continuous_batching_speculative_decoding/CMakeLists.txt create mode 100644 samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index 5339817c1f..a25ace7558 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -6,6 +6,7 @@ add_subdirectory(cpp/beam_search_causal_lm) add_subdirectory(cpp/chat_sample) add_subdirectory(cpp/continuous_batching_accuracy) add_subdirectory(cpp/continuous_batching_benchmark) +# add_subdirectory(cpp/continuous_batching_speculative_decoding) add_subdirectory(cpp/greedy_causal_lm) add_subdirectory(cpp/multinomial_causal_lm) add_subdirectory(cpp/prompt_lookup_decoding_lm) diff --git a/samples/cpp/continuous_batching_speculative_decoding/CMakeLists.txt b/samples/cpp/continuous_batching_speculative_decoding/CMakeLists.txt new file mode 100644 index 0000000000..b6b8487b74 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# start of dependencies + +include(FetchContent) + +FetchContent_Declare(cxxopts + URL https://github.com/jarro2783/cxxopts/archive/refs/tags/v3.1.1.tar.gz + URL_HASH SHA256=523175f792eb0ff04f9e653c90746c12655f10cb70f1d5e6d6d9491420298a08) + +FetchContent_Declare(nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v3.11.3.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406) + +FetchContent_MakeAvailable(cxxopts) +FetchContent_MakeAvailable(nlohmann_json) + +find_package(OpenVINO REQUIRED COMPONENTS Runtime) + +# end of dependencies + +set(TARGET_NAME continuous_batching_speculative_decoding) +add_executable(${TARGET_NAME} ${TARGET_NAME}.cpp "speculative_decoding_pipeline.hpp" "speculative_decoding_pipeline.cpp") +target_link_libraries(${TARGET_NAME} PRIVATE openvino::genai cxxopts::cxxopts) \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp new file mode 100644 index 0000000000..d3a4f01100 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp @@ -0,0 +1,140 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "openvino/genai/generation_config.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +void print_generation_result(const ov::genai::GenerationResult& generation_result) { + for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { + std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl; + } +} + +int main(int argc, char* argv[]) try { + // Command line options + + cxxopts::Options options("accuracy_sample", "Help command"); + + options.add_options() + ("n,num_prompts", "A number of prompts", cxxopts::value()->default_value("1")) + ("dynamic_split_fuse", "Whether to use dynamic split-fuse or vLLM scheduling", cxxopts::value()->default_value("false")) + ("m,model", "Path to model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("a,assisting_model", "Path to assisting model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("k,candidates_number", "candidates_number", cxxopts::value()->default_value("5")) + ("g,generated_len", "generated_len", cxxopts::value()->default_value("30")) + ("h,help", "Print usage"); + + cxxopts::ParseResult result; + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cout << e.what() << "\n\n"; + std::cout << options.help() << std::endl; + return EXIT_FAILURE; + } + + if (result.count("help")) { + std::cout << options.help() << std::endl; + return EXIT_SUCCESS; + } + + const size_t num_prompts = result["num_prompts"].as(); + const bool dynamic_split_fuse = result["dynamic_split_fuse"].as(); + const std::string models_path = result["model"].as(); + const std::string assisting_models_path = result["assisting_model"].as(); + const size_t k = result["candidates_number"].as(); + const size_t g = result["generated_len"].as(); + + // create dataset + + std::vector prompt_examples = { + "What is OpenVINO?", + "How are you?", + "What is your name?", + "Tell me something about Canada", + "What is OpenVINO?", + }; + + auto greedy = ov::genai::greedy(); + greedy.max_new_tokens = g; + + std::vector sampling_params_examples { + // ov::genai::beam_search(), + greedy, + // ov::genai::multinomial(), + }; + + std::vector prompts(num_prompts); + std::vector sampling_params(num_prompts); + + for (size_t request_id = 0; request_id < num_prompts; ++request_id) { + prompts[request_id] = prompt_examples[request_id % prompt_examples.size()]; + sampling_params[request_id] = sampling_params_examples[request_id % sampling_params_examples.size()]; + } + + // Perform the inference + + ov::genai::SchedulerConfig scheduler_config; + // batch size + scheduler_config.max_num_batched_tokens = 32; + // cache params + scheduler_config.num_kv_blocks = 364; + scheduler_config.block_size = 32; + // mode - vLLM or dynamic_split_fuse + scheduler_config.dynamic_split_fuse = dynamic_split_fuse; + // vLLM specific params + scheduler_config.max_num_seqs = 2; + + // It's possible to construct a Tokenizer from a different path. + // If the Tokenizer isn't specified, it's loaded from the same folder. + SpeculativeDecodingPipeline pipe(models_path, assisting_models_path, k, ov::genai::Tokenizer{models_path}, scheduler_config, "CPU"); + auto start_time = std::chrono::system_clock::now(); + std::vector generation_results = pipe.generate(prompts, sampling_params); + + for (size_t request_id = 0; request_id < generation_results.size(); ++request_id) { + const ov::genai::GenerationResult & generation_result = generation_results[request_id]; + std::cout << "Question: " << prompts[request_id] << std::endl; + switch (generation_result.m_status) + { + case ov::genai::GenerationStatus::FINISHED: + print_generation_result(generation_result); + break; + case ov::genai::GenerationStatus::IGNORED: + std::cout << "Request was ignored due to lack of memory." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + std::cout << "Request was aborted." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + default: + break; + } + std::cout << std::endl; + } + auto end_time = std::chrono::system_clock::now(); + std::chrono::duration duration = end_time - start_time; + std::cout << std::endl; + std::cout << "Duration: " << duration.count() << std::endl; + std::cout << "Infer number: " << pipe.matches_info.size() << std::endl; + std::cout << "MAX matches number: " << pipe.max_matches << std::endl; + auto a = std::accumulate(pipe.matches_info.begin(), pipe.matches_info.end(), 0); + std::cout << "AVG matches number: " << (float(a) / pipe.matches_info.size()) << std::endl; + double c = double(pipe.assisting_model_duration) * 100 / std::chrono::duration_cast(duration).count(); + std::cout << "Speculative model time duration: " << pipe.assisting_model_duration * 1e-9 << " in %: " << c << std::endl; +} catch (const std::exception& error) { + std::cerr << error.what() << '\n'; + return EXIT_FAILURE; +} catch (...) { + std::cerr << "Non-exception object thrown\n"; + return EXIT_FAILURE; +} \ No newline at end of file diff --git a/src/cpp/src/logit_processor.hpp b/src/cpp/src/logit_processor.hpp index bdd15f429c..853224b2bd 100644 --- a/src/cpp/src/logit_processor.hpp +++ b/src/cpp/src/logit_processor.hpp @@ -367,6 +367,15 @@ class LogitProcessor { ++m_generated_tokens; } + void decrement_gen_tokens() { + OPENVINO_ASSERT(m_generated_tokens > 0); + --m_generated_tokens; + } + + size_t get_gen_token_len() { + return m_generated_tokens; + } + void register_new_generated_token(int64_t new_token_id) { auto it = m_unique_generated_token_ids->find(new_token_id); if (it == m_unique_generated_token_ids->end()) { @@ -375,4 +384,9 @@ class LogitProcessor { it->second++; } } + + void decrease_generated_token_occurance(int64_t token_id) { + OPENVINO_ASSERT(m_unique_generated_token_ids->count(token_id) > 0); + m_unique_generated_token_ids->at(token_id)--; + } }; diff --git a/src/cpp/src/sampler.hpp b/src/cpp/src/sampler.hpp index 2670e50d03..d39b59835c 100644 --- a/src/cpp/src/sampler.hpp +++ b/src/cpp/src/sampler.hpp @@ -206,12 +206,13 @@ class GroupBeamSearcher { class Sampler { - Logits _get_logit_vector(ov::Tensor logits, size_t batch_idx = 1) { + Logits _get_logit_vector(ov::Tensor logits, size_t batch_idx = 1, size_t token_offset = 0) { ov::Shape logits_shape = logits.get_shape(); size_t batch_size = logits_shape[0], seq_len = logits_shape[1], vocab_size = logits_shape[2]; OPENVINO_ASSERT(batch_idx <= batch_size); + OPENVINO_ASSERT(token_offset < seq_len); size_t batch_offset = batch_idx * seq_len * vocab_size; - size_t sequence_offset = (seq_len - 1) * vocab_size; + size_t sequence_offset = (seq_len - token_offset - 1) * vocab_size; float* logits_data = logits.data() + batch_offset + sequence_offset; return Logits{logits_data, vocab_size}; @@ -228,6 +229,15 @@ class Sampler { max_index = i; } } + // todo: remove: to print: + // std::vector logit_vector_copy(logits.m_size); + // logit_vector_copy = logits.m_size.m_data; + // std::sort(logit_vector_copy.begin(), logit_vector_copy.end(), [](Token& lhs, Token& rhs){ return lhs.m_log_prob > rhs.m_log_prob; }); + // std::cout << "N max_sampled_tokens: "; + // for (size_t i = 0; i < 5; ++i) { + // std::cout << logit_vector_copy[i].m_index << " "; + // } + // std::cout << std::endl; return Token(logits.m_data[max_index], max_index); } @@ -261,14 +271,21 @@ class Sampler { std::map m_logit_processors; public: - SamplerOutput sample(std::vector & sequence_groups, ov::Tensor logits); + SamplerOutput sample(std::vector & sequence_groups, ov::Tensor logits, bool is_validation_mode_enabled = false); void set_seed(size_t seed) { rng_engine.seed(seed); } void clear_beam_search_info(uint64_t request_id); + + void update_logit_processor(uint64_t request_id, uint64_t token_id) { + OPENVINO_ASSERT(m_logit_processors.count(request_id)); + auto& logit_processor = m_logit_processors.at(request_id); + logit_processor.decrease_generated_token_occurance(token_id); + logit_processor.decrement_gen_tokens(); + } }; -SamplerOutput Sampler::sample(std::vector & sequence_groups, ov::Tensor logits) { +SamplerOutput Sampler::sample(std::vector & sequence_groups, ov::Tensor logits, bool is_validation_mode_enabled) { const float * logits_data = logits.data(); ov::Shape logits_shape = logits.get_shape(); OPENVINO_ASSERT(logits_shape.size() == 3); @@ -295,44 +312,91 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, const void * sequence_group_logits_data = logits_data + vocab_size * currently_processed_tokens; ov::Tensor sequence_group_logits(ov::element::f32, ov::Shape{num_running_sequences, actual_seq_len, vocab_size}, (void *)sequence_group_logits_data); + size_t decrease_len = 0; if (sequence_group->requires_sampling()) { + auto token_id = actual_seq_len; + // prompt phase + if (sequence_group->get_num_processed_tokens() == 0) { + token_id -= (sequence_group->get_prompt_len() - 1); + } if (sampling_params.is_greedy_decoding() || sampling_params.is_multinomial()) { std::vector running_sequences = sequence_group->get_running_sequences(); - if (sampling_params.is_greedy_decoding()) { - OPENVINO_ASSERT(num_running_sequences == 1); + // limitation for speculative decoding + if (is_validation_mode_enabled || sampling_params.is_greedy_decoding()) { + OPENVINO_ASSERT(running_sequences.size() == 1); } - auto register_new_token = [&](const Token& sampled_token_id, Sequence::Ptr running_sequence) { + auto register_new_token = [&](const Token& sampled_token_id, size_t running_sequence_id, bool is_extend_sequence = true) { logit_processor.register_new_generated_token(sampled_token_id.m_index); - running_sequence->append_token(sampled_token_id.m_index, sampled_token_id.m_log_prob); + // increment seq len only for one sequence in sequence group to sync them + if (running_sequence_id == num_running_sequences - 1) { + logit_processor.increment_gen_tokens(); + } + if (is_extend_sequence) { + running_sequences[running_sequence_id]->append_token(sampled_token_id.m_index, sampled_token_id.m_log_prob); + } }; for (size_t running_sequence_id = 0; running_sequence_id < num_running_sequences; ++running_sequence_id) { - auto logit_vector = _get_logit_vector(sequence_group_logits, running_sequence_id); - logit_processor.apply(logit_vector); - Token sampled_token_id; - if (sampling_params.is_greedy_decoding()) { - sampled_token_id = _greedy_sample(logit_vector); - } else { - // is_multinomial() - const bool is_generate_n_tokens = sequence_group->num_total_seqs() == 1; - const size_t num_tokens_per_sequence = is_generate_n_tokens ? sampling_params.num_return_sequences : 1; - auto sampled_token_ids = _multinomial_sample(logit_vector, num_tokens_per_sequence); - sampled_token_id = sampled_token_ids[0]; - - if (is_generate_n_tokens) { - auto sequence_to_fork = running_sequences[0]; - std::list forked_seq_ids; - for (size_t i = num_running_sequences; i < num_tokens_per_sequence; ++i) { - const auto forked_sequence = sequence_group->fork_sequence(sequence_to_fork); - forked_seq_ids.push_back(forked_sequence->get_id()); - register_new_token(sampled_token_ids[i], forked_sequence); + const auto running_seq_token_ids = running_sequences[running_sequence_id]->get_generated_ids(); + // max lenght of new added tokens + auto max_new_tokens_cnt = sampling_params.max_new_tokens - running_seq_token_ids.size() + token_id; + // token offset in case of multiple token infer + auto token_id_per_seq = token_id; + while (--token_id_per_seq >= 0 && --max_new_tokens_cnt >= 0) { + if (max_new_tokens_cnt == 0) { + running_sequences[running_sequence_id]->remove_last_n_tokens(token_id_per_seq); + decrease_len = std::max(decrease_len, token_id_per_seq); + break; + } + Token sampled_token_id; + // get logit processors only for validation or generation, not for cases of extending KV cache + if (token_id_per_seq == 0 && !is_validation_mode_enabled || is_validation_mode_enabled) { + auto logit_vector = _get_logit_vector(sequence_group_logits, running_sequence_id, token_id_per_seq); + logit_processor.apply(logit_vector); + + if (sampling_params.is_greedy_decoding()) { + sampled_token_id = _greedy_sample(logit_vector); + } else { + // is_multinomial() + const bool is_generate_n_tokens = sequence_group->num_total_seqs() == 1; + const size_t num_tokens_per_sequence = is_generate_n_tokens ? sampling_params.num_return_sequences : 1; + auto sampled_token_ids = _multinomial_sample(logit_vector, num_tokens_per_sequence); + sampled_token_id = sampled_token_ids[0]; + + if (is_generate_n_tokens) { + auto sequence_to_fork = running_sequences[0]; + std::list forked_seq_ids; + for (size_t i = num_running_sequences; i < num_tokens_per_sequence; ++i) { + const auto forked_sequence_id = sequence_group->fork_sequence(sequence_to_fork)->get_id(); + forked_seq_ids.push_back(forked_sequence_id); + register_new_token(sampled_token_ids[i], forked_sequence_id); + } + sampler_output.m_forked_sequences.insert({running_sequences[0]->get_id(), forked_seq_ids}); + } } - sampler_output.m_forked_sequences.insert({running_sequences[0]->get_id(), forked_seq_ids}); } - } - - register_new_token(sampled_token_id, running_sequences[running_sequence_id]); + + // flag to add sampled token to generated sequence or extend logit processors only + bool is_extend_sequence = token_id_per_seq == 0; + if (token_id_per_seq > 0) { + auto it = running_seq_token_ids.rbegin(); + std::advance(it, token_id_per_seq - 1); + // to validate candidates from assisting model and remove incorrect ones from generated sequence + if (is_validation_mode_enabled && *it != sampled_token_id.m_index) { + running_sequences[running_sequence_id]->remove_last_n_tokens(token_id_per_seq); + decrease_len = std::max(decrease_len, token_id_per_seq); + is_extend_sequence = true; + token_id_per_seq = 0; + } else { + sampled_token_id.m_index = *it; + } + } + + register_new_token(sampled_token_id, running_sequence_id, is_extend_sequence); + if (token_id_per_seq == 0) { + break; + } + }; } - logit_processor.increment_gen_tokens(); for (const auto& dropped_seq_id : sequence_group->try_finish_generation()) { sampler_output.m_dropped_sequences.push_back(dropped_seq_id); } @@ -365,6 +429,9 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, // NOTE: it should be before 'get_num_scheduled_tokens' is used // update internal state of sequence group to reset scheduler tokens and update currently processed ones sequence_group->finish_iteration(); + if (decrease_len) { + sequence_group->decrease_processed_tokens(decrease_len); + } // accumulate a number of processed tokens currently_processed_tokens += padded_amount_of_processed_tokens * num_running_sequences; diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index 3825caf446..caf8cd2cc4 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -62,6 +62,11 @@ class Scheduler { return scheduler_output; } + void clean_empty_blocks(std::vector& seq_groups) { + for (const auto& seq_group : seq_groups) + m_block_manager.free_group_partially(seq_group, m_block_manager.num_free_blocks()); + } + const std::vector& get_block_table(const Sequence& seq) { return m_block_manager.get_block_table(seq.get_id()); } @@ -318,7 +323,7 @@ class Scheduler { // prompt phases can have a single running sequence OPENVINO_ASSERT(num_running_seqs == 1); // here we also assume that sequence must be scheduler in a single shot and has no already generated context - if (!m_config.enable_prefix_caching) + if (!m_config.enable_prefix_caching && sequence_group->get_validation_len() == 0) OPENVINO_ASSERT(sequence_group->get_context_len() == 0); size_t num_available_tokens_in_megabatch = m_config.max_num_batched_tokens - scheduler_output.m_total_num_scheduled_tokens; diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 73298a702c..55c1a154aa 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -21,6 +21,7 @@ enum class SequenceStatus { }; using TokenIds = std::vector; +using Probs = std::vector; class SequenceGroup; class Sequence { @@ -31,6 +32,7 @@ class Sequence { } TokenIds m_generated_ids; + Probs m_generated_probs; uint64_t m_grouped_id; uint64_t m_id = _get_next_global_sequence_id(); SequenceStatus m_status = SequenceStatus::RUNNING; @@ -50,6 +52,7 @@ class Sequence { // don't use directly Sequence(const Sequence& seq, const uint64_t id) : m_generated_ids(seq.m_generated_ids), + m_generated_probs(seq.m_generated_probs), m_grouped_id(id), m_status(seq.m_status), m_cumulative_log_prob(seq.m_cumulative_log_prob){ @@ -107,7 +110,19 @@ class Sequence { // appends new tokens to a generated part void append_token(int64_t token_id, float log_prob) { m_cumulative_log_prob += log_prob; - m_generated_ids.push_back(token_id); + m_generated_ids.push_back(token_id); + m_generated_probs.push_back(log_prob); + } + + void remove_last_n_tokens(size_t k) { + const size_t generated_token_len = get_generated_len(); + OPENVINO_ASSERT(k <= generated_token_len); + for (size_t i = 0; i < k; ++i) { + m_cumulative_log_prob -= m_generated_probs[generated_token_len - 1 - i]; + } + size_t new_length = generated_token_len - k; + m_generated_ids.resize(new_length); + m_generated_probs.resize(new_length); } GenerationOutput get_last_generation_output() { @@ -127,6 +142,10 @@ class Sequence { return m_generated_ids; } + const Probs & get_log_probs() const { + return m_generated_probs; + } + float get_cumulative_log_probs() const { return m_cumulative_log_prob; } @@ -174,6 +193,8 @@ class SequenceGroup { size_t m_num_scheduled_tokens = 0; // context length of longest sequence within a group size_t m_max_content_len = 0; + // max validation length within a group to check generated tokens + size_t m_validation_len = 0; SequenceGroup(uint64_t request_id, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size, bool enable_prefix_caching) : m_request_id(request_id), @@ -360,24 +381,49 @@ class SequenceGroup { m_num_scheduled_tokens = 0; } + void clear_validated_tokens() { + m_validation_len = 0; + } + bool is_scheduled() const { return m_num_scheduled_tokens > 0; } + void set_validation_len(size_t k) { + m_validation_len = k; + } + + size_t get_validation_len() { + return m_validation_len; + } + size_t get_num_available_tokens_for_batching() const { OPENVINO_ASSERT(!has_finished(), "Internal error: this function cannot be called on finished sequence group"); OPENVINO_ASSERT(get_num_scheduled_tokens() == 0, "Internal error: this function cannot be called when we are already in scheduling phase"); // if sequence group has not finished, it has at least one token to process - size_t num_available_tokens = std::max(get_prompt_len(), m_max_content_len); + size_t num_available_tokens = std::max(get_prompt_len(), m_max_content_len) + m_validation_len; return std::max(num_available_tokens - m_num_processed_tokens, 1u); } + void increase_processed_tokens(size_t token_cnt) { + m_num_processed_tokens += token_cnt; + m_max_content_len += token_cnt; + } + + void decrease_processed_tokens(size_t token_cnt) { + OPENVINO_ASSERT(m_num_processed_tokens >= token_cnt); + m_num_processed_tokens -= token_cnt; + OPENVINO_ASSERT(m_max_content_len >= token_cnt); + m_max_content_len -= token_cnt; + } + // mark current schedule phase as finished and updates internal counters void finish_iteration() { m_num_processed_tokens += m_num_scheduled_tokens; // if some processed tokens were evicted, max content len is greater than number of processed tokens m_max_content_len = std::max(m_max_content_len, m_num_processed_tokens); clear_scheduled_tokens(); + clear_validated_tokens(); } void update_processed_tokens_num(size_t processed_tokens) { @@ -513,9 +559,10 @@ class SequenceGroup { } } else if (m_sampling_params.is_greedy_decoding() || m_sampling_params.is_multinomial()) { // TO DO: Now we always stream for greedy search for the sake of benchmarking - if (num_total_seqs() == 1) { - push_partial_outputs(); - } else if (has_finished() || out_of_memory()) { + // if (num_total_seqs() == 1) { + // push_partial_outputs(); + // } else + if (has_finished() || out_of_memory()) { push_outputs(); } } From a538e24e35e1a299c08c591c08526be93ed20f65 Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Fri, 30 Aug 2024 16:29:41 +0400 Subject: [PATCH 2/4] Speculative decoding --- samples/CMakeLists.txt | 4 +- .../CMakeLists.txt | 25 ++ .../continuous_batching_prompt_lookup.cpp | 138 +++++++++++ .../prompt_lookup_pipeline.cpp | 189 +++++++++++++++ .../prompt_lookup_pipeline.hpp | 46 ++++ ...ntinuous_batching_speculative_decoding.cpp | 16 +- .../speculative_decoding_pipeline.cpp | 217 ++++++++++++++++++ .../speculative_decoding_pipeline.hpp | 37 +++ .../genai/continuous_batching_pipeline.hpp | 38 +++ src/cpp/src/block_manager.hpp | 15 ++ src/cpp/src/continuous_batching_pipeline.cpp | 200 ++++++++++++++-- .../src/paged_attention_transformations.cpp | 5 +- src/cpp/src/scheduler.hpp | 2 +- 13 files changed, 899 insertions(+), 33 deletions(-) create mode 100644 samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt create mode 100644 samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp create mode 100644 samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp create mode 100644 samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp create mode 100644 samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp create mode 100644 samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index a25ace7558..9fc0642275 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -6,7 +6,9 @@ add_subdirectory(cpp/beam_search_causal_lm) add_subdirectory(cpp/chat_sample) add_subdirectory(cpp/continuous_batching_accuracy) add_subdirectory(cpp/continuous_batching_benchmark) -# add_subdirectory(cpp/continuous_batching_speculative_decoding) +# todo: iefode +# add_subdirectory(cpp/continuous_batching_prompt_lookup) +add_subdirectory(cpp/continuous_batching_speculative_decoding) add_subdirectory(cpp/greedy_causal_lm) add_subdirectory(cpp/multinomial_causal_lm) add_subdirectory(cpp/prompt_lookup_decoding_lm) diff --git a/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt b/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt new file mode 100644 index 0000000000..b122782a2e --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# start of dependencies + +include(FetchContent) + +FetchContent_Declare(cxxopts + URL https://github.com/jarro2783/cxxopts/archive/refs/tags/v3.1.1.tar.gz + URL_HASH SHA256=523175f792eb0ff04f9e653c90746c12655f10cb70f1d5e6d6d9491420298a08) + +FetchContent_Declare(nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v3.11.3.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406) + +FetchContent_MakeAvailable(cxxopts) +FetchContent_MakeAvailable(nlohmann_json) + +find_package(OpenVINO REQUIRED COMPONENTS Runtime) + +# end of dependencies + +set(TARGET_NAME continuous_batching_prompt_lookup) +add_executable(${TARGET_NAME} ${TARGET_NAME}.cpp "prompt_lookup_pipeline.hpp" "prompt_lookup_pipeline.cpp") +target_link_libraries(${TARGET_NAME} PRIVATE openvino::genai cxxopts::cxxopts) \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp b/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp new file mode 100644 index 0000000000..3219d2e74f --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp @@ -0,0 +1,138 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "openvino/genai/generation_config.hpp" + +#include "prompt_lookup_pipeline.hpp" + +void print_generation_result(const ov::genai::GenerationResult& generation_result) { + for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { + std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl; + } +} + +int main(int argc, char* argv[]) try { + // Command line options + + cxxopts::Options options("accuracy_sample", "Help command"); + + options.add_options() + ("n,num_prompts", "A number of prompts", cxxopts::value()->default_value("1")) + ("dynamic_split_fuse", "Whether to use dynamic split-fuse or vLLM scheduling", cxxopts::value()->default_value("false")) + ("m,model", "Path to model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("k,candidates_number", "candidates_number", cxxopts::value()->default_value("5")) + ("ngram", "Ngram", cxxopts::value()->default_value("5")) + ("g,generated_len", "generated_len", cxxopts::value()->default_value("30")) + ("h,help", "Print usage"); + + cxxopts::ParseResult result; + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cout << e.what() << "\n\n"; + std::cout << options.help() << std::endl; + return EXIT_FAILURE; + } + + if (result.count("help")) { + std::cout << options.help() << std::endl; + return EXIT_SUCCESS; + } + + const size_t num_prompts = result["num_prompts"].as(); + const bool dynamic_split_fuse = result["dynamic_split_fuse"].as(); + const std::string models_path = result["model"].as(); + const size_t k = result["candidates_number"].as(); + const size_t g = result["generated_len"].as(); + const size_t n = result["ngram"].as(); + + // create dataset + + std::vector prompt_examples = { + // "What is OpenVINO?", + // "How are you?", + "code: ```for (const auto& a : b) { std::cout << a << std::endl; }```", + "Tell me something about Canada", + "What is OpenVINO?", + }; + + auto greedy = ov::genai::greedy(); + greedy.max_new_tokens = g; + + std::vector sampling_params_examples { + // ov::genai::beam_search(), + greedy, + // ov::genai::multinomial(), + }; + + std::vector prompts(num_prompts); + std::vector sampling_params(num_prompts); + + for (size_t request_id = 0; request_id < num_prompts; ++request_id) { + prompts[request_id] = prompt_examples[request_id % prompt_examples.size()]; + sampling_params[request_id] = sampling_params_examples[request_id % sampling_params_examples.size()]; + } + + // Perform the inference + + ov::genai::SchedulerConfig scheduler_config; + // batch size + scheduler_config.max_num_batched_tokens = 256; + // cache params + scheduler_config.num_kv_blocks = 364; + scheduler_config.block_size = 32; + // mode - vLLM or dynamic_split_fuse + scheduler_config.dynamic_split_fuse = dynamic_split_fuse; + // vLLM specific params + scheduler_config.max_num_seqs = 2; + + // It's possible to construct a Tokenizer from a different path. + // If the Tokenizer isn't specified, it's loaded from the same folder. + PromptLookupPipeline pipe(models_path, k, n, ov::genai::Tokenizer{models_path}, scheduler_config, "CPU"); + auto start_time = std::chrono::system_clock::now(); + std::vector generation_results = pipe.generate(prompts, sampling_params); + + for (size_t request_id = 0; request_id < generation_results.size(); ++request_id) { + const ov::genai::GenerationResult & generation_result = generation_results[request_id]; + std::cout << "Question: " << prompts[request_id] << std::endl; + switch (generation_result.m_status) + { + case ov::genai::GenerationStatus::FINISHED: + print_generation_result(generation_result); + break; + case ov::genai::GenerationStatus::IGNORED: + std::cout << "Request was ignored due to lack of memory." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + std::cout << "Request was aborted." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + default: + break; + } + std::cout << std::endl; + } + auto end_time = std::chrono::system_clock::now(); + std::chrono::duration duration = end_time - start_time; + std::cout << std::endl; + std::cout << "Duration: " << duration.count() << std::endl; + std::cout << "Infer number: " << pipe.infer_cnt << std::endl; + std::cout << "MAX matches number: " << pipe.max_matches << std::endl; + std::cout << "AVG matches number: " << (float(pipe.avg_matches) / pipe.infer_cnt) << std::endl; +} catch (const std::exception& error) { + std::cerr << error.what() << '\n'; + return EXIT_FAILURE; +} catch (...) { + std::cerr << "Non-exception object thrown\n"; + return EXIT_FAILURE; +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp new file mode 100644 index 0000000000..af95d62f4b --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp @@ -0,0 +1,189 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "prompt_lookup_pipeline.hpp" + +PromptLookupPipeline::PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t ngram_size, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + ov::genai::Tokenizer tokenizer(models_path); + PromptLookupPipeline(models_path, candidates_number, max_ngram_size, tokenizer, scheduler_config, device, plugin_config); +}; + +PromptLookupPipeline::PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t ngram_size, + const ov::genai::Tokenizer& tokenizer, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + m_tokenizer = tokenizer; + set_k(candidates_number); + max_ngram_size = ngram_size; + + model_pipeline = ov::genai::ContinuousBatchingPipeline(models_path, m_tokenizer, scheduler_config, device, plugin_config); + model_pipeline.enable_validation_mode(); +} + +ov::genai::PipelineMetrics PromptLookupPipeline::get_metrics() const { + return model_pipeline.get_metrics(); +} + +void PromptLookupPipeline::step() { + std::cout << "=======STEP==================" << std::endl; + bool is_updated = false; + if (is_speculative_mode) { + // predict tokens using prompt + std::cout << "num_candidates: " << candidates_number << std::endl; + for (const auto& whole_input : model_pipeline.get_prompts_with_generated_tokens()) { + auto updated_input = whole_input; + const auto& input_ids = whole_input.token_ids; + const size_t input_length = input_ids.size(); + for (int32_t ngram_size = max_ngram_size; ngram_size > 0; ngram_size--) { + std::vector ngram = std::vector{input_ids.cend() - ngram_size, input_ids.cend()}; + std::cout << "ngram: " << std::endl; + for (const auto& a : ngram) { + std::cout << a; + } + std::cout << std::endl; + + // find ngram match in input_ids + size_t ngram_i = 0; + for (size_t input_i = 0; input_i < input_length - ngram_size; input_i++) { + if (ngram[ngram_i] != input_ids[input_i]) { + ngram_i = 0; + continue; + } + ngram_i++; + + if (ngram_i < ngram_size) { + continue; + } + + // match found with the end at input_i + size_t avaliable_num_pred = std::min(input_length - (input_i + 1), candidates_number); + + // return candidates with length of avaliable_num_pred + std::vector candidate{input_ids.cbegin() + input_i + 1, + input_ids.cbegin() + input_i + 1 + avaliable_num_pred}; + updated_input.token_ids = candidate; + updated_input.log_probs = std::vector(candidate.size(), 0); + + model_pipeline.update_generated_sequence(updated_input); + break; + } + if (whole_input.token_ids != updated_input.token_ids) { + is_updated = true; + break; + } + } + } + + // put candidates to model cache + auto candidate_sequences = model_pipeline.get_generated_sequences(); + // todo: remove debug code + for (const auto& s : candidate_sequences) { + std::cout << "ASSISTANT: "; + for (const auto& d : s.token_ids) { + std::cout << d << " "; + } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + std::cout << std::endl; + std::cout << decode(s.token_ids) << std::endl; + } + } + + const auto gen_seq_before = model_pipeline.get_generated_sequences(); + + // validate candidates and generate 1 new token + model_pipeline.step(); + + if (is_speculative_mode && is_updated) { + // todo: remove debug code + for (const auto& s : model_pipeline.get_generated_sequences()) { + std::cout << "MODEL: "; + for (const auto& d : s.token_ids) { + std::cout << d << " "; + } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + std::cout << std::endl; + std::cout << decode(s.token_ids) << std::endl; + std::cout << std::endl; + } + + // todo: iefode: remove debug prints + for (const auto& gen_seq_after : model_pipeline.get_generated_sequences()) { + const auto& candidate_seq = gen_seq_before[gen_seq_after.request_id]; + size_t before_len = candidate_seq.token_ids.size(), + after_len = gen_seq_after.token_ids.size(); + size_t dist = is_updated ? (after_len <= before_len ? (before_len - after_len) : candidates_number) : 0; + update_strategy(dist); + } + // ov::genai::ContinuousBatchingPipeline::UpdateSeqResult update_result; + // for (const auto& checked_sequence : checked_sequences) { + // update_result = assisting_pipeline.update_generated_sequence(checked_sequence); + // } + + // OPENVINO_ASSERT(candidates_number >= update_result.to_remove); + // if (update_result.to_remove) { + // std::cout << "to_remove: " << update_result.to_remove << std::endl; + // } + // update_strategy(candidates_number - update_result.to_remove); + // std::cout << "=========================" << std::endl; + } +} + +void PromptLookupPipeline::update_strategy(size_t num_matches) { + std::cout << "num_matches: " << num_matches << std::endl; + max_matches = std::max(max_matches, num_matches); + avg_matches += num_matches; + if (max_candidates_number == 0) { + return; + } + if (num_matches == candidates_number) { + candidates_number = std::min(candidates_number + 2, max_candidates_number); + } else { + candidates_number = std::max(int64_t(candidates_number) - 1, int64_t(1)); + } +} + + +void PromptLookupPipeline::set_k(size_t new_default_k) { + candidates_number = new_default_k; + max_candidates_number = new_default_k * 2; + is_speculative_mode = candidates_number > 0; +} + +bool PromptLookupPipeline::has_non_finished_requests() { + return model_pipeline.has_non_finished_requests(); +} + + +std::vector +PromptLookupPipeline::generate_sequences( + const std::vector prompts, + std::vector sampling_params) { + OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); + OPENVINO_ASSERT(prompts.size() == sampling_params.size()); + + std::vector generations, assisting_generations; + for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { + generations.push_back(model_pipeline.add_request(request_id, prompts[request_id], sampling_params[request_id])); + } + + while (has_non_finished_requests()) { + step(); + infer_cnt++; + } + + return generations; +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp new file mode 100644 index 0000000000..c48f70bb9e --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp @@ -0,0 +1,46 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "openvino/genai/cb_basic_pipeline.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +class PromptLookupPipeline : public ov::genai::BasicPipeline { + ov::genai::ContinuousBatchingPipeline model_pipeline; + size_t candidates_number = 0, max_candidates_number = 0, max_ngram_size = 0; + bool is_speculative_mode = false; + std::map> m_encoded_prompt; + + std::vector generate_sequences( + const std::vector prompts, std::vector sampling_params) override; + + +public: + size_t infer_cnt = 0, max_matches = 0, avg_matches = 0; + + PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t max_ngram_size, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t max_ngram_size, + const ov::genai::Tokenizer& tokenizer, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + ov::genai::PipelineMetrics get_metrics() const override; + + void step() override; + + bool has_non_finished_requests() override; + + void set_k(size_t new_default_k); + + void update_strategy(size_t num_matches); +}; \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp index d3a4f01100..f5589bfcdc 100644 --- a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp +++ b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp @@ -5,7 +5,7 @@ #include #include "openvino/genai/generation_config.hpp" -#include "openvino/genai/continuous_batching_pipeline.hpp" +#include "speculative_decoding_pipeline.hpp" void print_generation_result(const ov::genai::GenerationResult& generation_result) { for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { @@ -90,7 +90,7 @@ int main(int argc, char* argv[]) try { // It's possible to construct a Tokenizer from a different path. // If the Tokenizer isn't specified, it's loaded from the same folder. - SpeculativeDecodingPipeline pipe(models_path, assisting_models_path, k, ov::genai::Tokenizer{models_path}, scheduler_config, "CPU"); + SpeculativeDecodingPipeline pipe(models_path, assisting_models_path, k, scheduler_config, "CPU"); auto start_time = std::chrono::system_clock::now(); std::vector generation_results = pipe.generate(prompts, sampling_params); @@ -125,12 +125,12 @@ int main(int argc, char* argv[]) try { std::chrono::duration duration = end_time - start_time; std::cout << std::endl; std::cout << "Duration: " << duration.count() << std::endl; - std::cout << "Infer number: " << pipe.matches_info.size() << std::endl; - std::cout << "MAX matches number: " << pipe.max_matches << std::endl; - auto a = std::accumulate(pipe.matches_info.begin(), pipe.matches_info.end(), 0); - std::cout << "AVG matches number: " << (float(a) / pipe.matches_info.size()) << std::endl; - double c = double(pipe.assisting_model_duration) * 100 / std::chrono::duration_cast(duration).count(); - std::cout << "Speculative model time duration: " << pipe.assisting_model_duration * 1e-9 << " in %: " << c << std::endl; + std::cout << "Infer number: " << pipe.m_matches_info.size() << std::endl; + std::cout << "MAX matches number: " << pipe.m_max_matches << std::endl; + auto a = std::accumulate(pipe.m_matches_info.begin(), pipe.m_matches_info.end(), 0); + std::cout << "AVG matches number: " << (float(a) / pipe.m_matches_info.size()) << std::endl; + double c = double(pipe.m_speculative_model_duration) * 100 / std::chrono::duration_cast(duration).count(); + std::cout << "Speculative model time duration: " << pipe.m_speculative_model_duration * 1e-9 << " in %: " << c << std::endl; } catch (const std::exception& error) { std::cerr << error.what() << '\n'; return EXIT_FAILURE; diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp new file mode 100644 index 0000000000..9c4b96da06 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp @@ -0,0 +1,217 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "speculative_decoding_pipeline.hpp" + +inline size_t get_kv_cache_size(const std::shared_ptr& model) { + const auto& parameters = model->get_parameters(); + // extract num_kv_heads and head_size + size_t kv_caches_inputs_offset = 2; + ov::PartialShape k_shape = parameters[kv_caches_inputs_offset]->get_partial_shape(); + OPENVINO_ASSERT(k_shape.rank().get_length() == 3, "KV cache shape is expected to have rank 3, while shape is ", k_shape); + size_t num_kv_heads = k_shape[1].get_length(), head_size = k_shape[2].get_length(); + return num_kv_heads * head_size; +} + +SpeculativeDecodingPipeline::SpeculativeDecodingPipeline( + const std::string& models_path, + const std::string& speculative_models_path, + size_t start_candidates_number, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + m_tokenizer = ov::genai::Tokenizer(models_path, plugin_config); + + m_candidates_num = start_candidates_number; + m_max_candidates_num = m_candidates_num * 2; + m_is_speculative_mode = m_candidates_num > 0; + + ov::Core core; + std::shared_ptr model = ov::genai::read_model_and_apply_paged_attention(models_path, core), + assisting_model = ov::genai::read_model_and_apply_paged_attention(speculative_models_path, core); + + ov::genai::SchedulerConfig model_scheduler_config = scheduler_config, + assisting_scheduler_config = scheduler_config; + if (m_is_speculative_mode) { + size_t model_cache_size = get_kv_cache_size(model), + assisting_cache_size = get_kv_cache_size(assisting_model); + auto k = float(assisting_cache_size) / (model_cache_size + assisting_cache_size); + auto cache_size = scheduler_config.num_kv_blocks; + auto assisted_cache_size = size_t(cache_size * k); + cache_size -= assisted_cache_size; + model_scheduler_config.num_kv_blocks = cache_size; + assisting_scheduler_config.num_kv_blocks = assisted_cache_size; + m_speculative_pipeline = ov::genai::ContinuousBatchingPipeline(core, assisting_model, m_tokenizer, assisting_scheduler_config, device, plugin_config, false); + } + + m_pipeline = ov::genai::ContinuousBatchingPipeline(core, model, m_tokenizer, model_scheduler_config, device, plugin_config, true); + +} + +void SpeculativeDecodingPipeline::step() { + // std::cout << "=======STEP==================" << std::endl; + std::vector candidate_sequences; + if (m_is_speculative_mode) { + // generate candidates using small model + // std::cout << "num_candidates: " << candidates_number << std::endl; + for (size_t i = 0; i < m_candidates_num; ++i) { + auto start_time = std::chrono::system_clock::now(); + m_speculative_pipeline.step(); + auto end_time = std::chrono::system_clock::now(); + m_speculative_model_duration += std::chrono::duration_cast(end_time - start_time).count(); + } + + // put candidates to model cache + candidate_sequences = m_speculative_pipeline.get_generated_sequences(); + // todo: remove debug code + // for (const auto& s : candidate_sequences) { + // std::cout << "ASSISTANT: "; + // for (const auto& d : s.token_ids) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // std::cout << decode(s.token_ids) << std::endl; + // } + + for (const auto& candidate : candidate_sequences) { + m_pipeline.update_generated_sequence(candidate); + } + } + + // validate candidates and generate 1 new token + m_pipeline.step(); + + if (m_is_speculative_mode) { + // todo: iefode: remove debug prints + auto checked_sequences = m_pipeline.get_generated_sequences(); + // todo: remove debug code + // for (const auto& s : checked_sequences) { + // std::cout << "MODEL: "; + // for (const auto& d : s.token_ids) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // std::cout << decode(s.token_ids) << std::endl; + // std::cout << std::endl; + // } + + ov::genai::ContinuousBatchingPipeline::UpdateSeqResult update_result; + for (const auto& checked_sequence : checked_sequences) { + update_result = m_speculative_pipeline.update_generated_sequence(checked_sequence); + } + + OPENVINO_ASSERT(m_candidates_num >= update_result.to_remove); + // if (update_result.to_remove) { + // std::cout << "to_remove: " << update_result.to_remove << std::endl; + // } + update_strategy(m_candidates_num - update_result.to_remove); + // std::cout << "=========================" << std::endl; + } +} + +std::vector +SpeculativeDecodingPipeline::generate(const std::vector& prompts, + const std::vector& sampling_params) { + OPENVINO_ASSERT(!m_pipeline.has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); + OPENVINO_ASSERT(prompts.size() == sampling_params.size()); + + std::vector generations, speculative_generations; + for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { + generations.push_back(m_pipeline.add_request(request_id, prompts[request_id], sampling_params[request_id])); + if (m_is_speculative_mode) { + auto assisting_sampling_params = sampling_params[request_id]; + assisting_sampling_params.max_new_tokens += m_max_candidates_num; + assisting_sampling_params.min_new_tokens += m_max_candidates_num; + speculative_generations.push_back(m_speculative_pipeline.add_request(request_id, prompts[request_id], assisting_sampling_params)); + } + } + + while (m_pipeline.has_non_finished_requests()) { + step(); + } + if (m_is_speculative_mode) { + m_speculative_pipeline.finish_all_requests(); + } + + std::vector encoded_results; + for (size_t generation_idx = 0; generation_idx < generations.size(); ++generation_idx) { + const auto& generation = generations[generation_idx]; + ov::genai::EncodedGenerationResult result; + result.m_request_id = 1; + std::vector generation_outputs = generation->read_all(); + std::sort(generation_outputs.begin(), generation_outputs.end(), [=] (ov::genai::GenerationOutput& r1, ov::genai::GenerationOutput& r2) { + return r1.score > r2.score; + }); + + auto num_outputs = std::min(sampling_params[generation_idx].num_return_sequences, generation_outputs.size()); + for (size_t generation_output_idx = 0; generation_output_idx < num_outputs; ++generation_output_idx) { + const auto& generation_output = generation_outputs[generation_output_idx]; + result.m_generation_ids.push_back(std::move(generation_output.generated_token_ids)); + result.m_scores.push_back(generation_output.score); + } + result.m_status = generation->get_status(); + encoded_results.push_back(std::move(result)); + } + + OPENVINO_ASSERT(encoded_results.size() == prompts.size()); + + std::vector decoded_results; + for (ov::genai::EncodedGenerationResult& res : encoded_results) { + std::vector generated; + generated.reserve(res.m_generation_ids.size()); + for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { + generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); + } + decoded_results.push_back(ov::genai::GenerationResult{ + res.m_request_id, + std::move(generated), + std::move(res.m_scores), + res.m_status + }); + } + return decoded_results; +} + +inline size_t get_median(std::vector values) { + const auto size = values.size(); + if (size == 0) { + return 0; + } + size_t offset = values.size() / 2; + + auto it = values.begin() + offset; + std::nth_element(values.begin(), it, values.end()); + + if (size % 2 != 0) { + return *it; + } + auto it_1 = values.begin() + offset - 1; + std::nth_element(values.begin(), it_1, values.end()); + return (*it + *it_1) / 2; +} + + +void SpeculativeDecodingPipeline::update_strategy(size_t num_matches) { + // std::cout << "num_matches: " << num_matches << " m_candidates_num: " << m_candidates_num << std::endl; + if (m_max_candidates_num == 0) { + return; + } + + if (m_max_matches < num_matches) { + m_max_matches = num_matches; + } + if (num_matches == m_candidates_num) { + m_candidates_num = std::min(std::max(m_candidates_num + 1, m_max_matches), m_max_candidates_num); + } else { + m_candidates_num = num_matches > 0 ? num_matches : std::max(get_median(m_matches_info), size_t(1)); + } + m_matches_info.push_back(num_matches); +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp new file mode 100644 index 0000000000..2100c3bdd0 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp @@ -0,0 +1,37 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include + +#include "openvino/genai/tokenizer.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +class SpeculativeDecodingPipeline { +protected: + ov::genai::Tokenizer m_tokenizer; + ov::genai::ContinuousBatchingPipeline m_pipeline, m_speculative_pipeline; + + bool m_is_speculative_mode = false; + size_t m_max_candidates_num = 0, m_candidates_num = 0; + + void update_strategy(size_t num_matches); + +public: + SpeculativeDecodingPipeline(const std::string& models_path, + const std::string& speculative_model_path, + size_t start_candidates_number, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + void step(); + + std::vector generate(const std::vector& prompts, const std::vector& sampling_params); + + size_t m_max_matches = 0; + std::vector m_matches_info; + int64_t m_speculative_model_duration = 0; +}; \ No newline at end of file diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index 626a51c5da..c0644d065f 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -29,6 +29,8 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { std::shared_ptr m_impl; public: + ContinuousBatchingPipeline() = default; + ContinuousBatchingPipeline(const std::string& models_path, const SchedulerConfig& scheduler_config, const std::string& device = "CPU", @@ -52,6 +54,16 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { const ov::AnyMap& plugin_config={} ); + ContinuousBatchingPipeline( + ov::Core& core, + const std::shared_ptr& model, + const ov::genai::Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device="CPU", + const ov::AnyMap& plugin_config={}, + bool is_enable_validation_mode=false + ); + ov::genai::Tokenizer get_tokenizer(); ov::genai::GenerationConfig get_config() const; @@ -80,5 +92,31 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { * @brief finish chat and clear kv cache. */ void finish_chat(); + + // for speculative decoding + void finish_all_requests(); + + struct GeneratedSequence { + uint64_t request_id = 0, sequence_id = 0; + std::vector token_ids; + std::vector log_probs; + + GeneratedSequence(uint64_t req_id, uint64_t seq_id, const std::vector& generated_token_ids, const std::vector& generated_log_probs) : + request_id(req_id), + sequence_id(seq_id), + token_ids(generated_token_ids), + log_probs(generated_log_probs) {}; + }; + + struct UpdateSeqResult { + size_t to_insert, to_remove; + UpdateSeqResult(size_t _to_insert = 0, size_t _to_remove = 0) : to_insert(_to_insert), to_remove(_to_remove) {}; + }; + + std::vector get_generated_sequences(); + UpdateSeqResult update_generated_sequence(const GeneratedSequence& new_sequence); }; + +std::shared_ptr OPENVINO_GENAI_EXPORTS read_model_and_apply_paged_attention(const std::string& model_path, ov::Core& core); + } diff --git a/src/cpp/src/block_manager.hpp b/src/cpp/src/block_manager.hpp index 0026ed60d6..5925beb13b 100644 --- a/src/cpp/src/block_manager.hpp +++ b/src/cpp/src/block_manager.hpp @@ -232,6 +232,9 @@ class BlockManager { auto running_sequences = sequence_group->get_not_finished_sequences(); for (size_t idx = 0; idx < running_sequences.size(); ++idx) { auto seq_id = running_sequences[idx]->get_id(); + if (m_block_table.count(seq_id) == 0) { + continue; + } OPENVINO_ASSERT(m_block_table.count(seq_id) > 0, "Invalid sequence group."); auto block_table = m_block_table[seq_id]; free_sequence_partially(seq_id, blocks_num); @@ -444,6 +447,18 @@ class BlockManager { return blocks_count; } + void free_empty_physical_blocks(SequenceGroup::Ptr seq_group) { + size_t num_logical_blocks = seq_group->get_num_logical_blocks(); + for (const auto& sequence : seq_group->get_running_sequences()) { + auto seq_id = sequence->get_id(); + auto& block_table = m_block_table[seq_id]; + size_t num_physical_blocks = block_table.size(); + if (num_physical_blocks > num_logical_blocks) { + free_sequence_partially(seq_id, num_physical_blocks - num_logical_blocks); + } + } + } + std::map> append_slots(SequenceGroup::Ptr seq_group) { size_t num_logical_blocks = seq_group->get_num_logical_blocks(); diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 8044eddc6c..30ea652724 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -21,7 +21,15 @@ using namespace ov::genai; template struct overloaded : Ts... {using Ts::operator()...;}; template overloaded(Ts...) -> overloaded; -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config); +void apply_paged_attention_transformations(std::shared_ptr model); +void set_type_and_shape_to_kv_cache(std::shared_ptr model, DeviceConfig& device_config); + +std::shared_ptr +ov::genai::read_model_and_apply_paged_attention(const std::string& models_path, ov::Core& core) { + auto model = core.read_model(models_path + "/openvino_model.xml"); + apply_paged_attention_transformations(model); + return model; +} class ContinuousBatchingPipeline::Impl { ov::genai::Tokenizer m_tokenizer; @@ -29,6 +37,7 @@ class ContinuousBatchingPipeline::Impl { std::shared_ptr m_cache_manager; std::shared_ptr m_model_runner; std::shared_ptr m_sampler; + bool m_is_validation_mode_enabled = false; // TODO (mzegla): GenerationConfig is request specific object // and pipeline only uses default rng_seed. @@ -85,17 +94,14 @@ class ContinuousBatchingPipeline::Impl { } } -public: - Impl(const std::string& models_path, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config) : - m_tokenizer{tokenizer} { - ov::Core core; - - // The model can be compiled for GPU as well - std::shared_ptr model = core.read_model(models_path + "/openvino_model.xml"); - - DeviceConfig device_config(core, scheduler_config, device, plugin_config); - - apply_paged_attention_transformations(model, device_config); + inline void + compile_model(std::shared_ptr model, + const SchedulerConfig& scheduler_config, + const ov::AnyMap& plugin_config, + const std::string& device, + ov::Core& core) { + DeviceConfig device_config(core, scheduler_config, device); + set_type_and_shape_to_kv_cache(model, device_config); ov::InferRequest infer_request = core.compile_model(model, device_config.get_device(), plugin_config).create_infer_request(); @@ -105,24 +111,49 @@ class ContinuousBatchingPipeline::Impl { infer_request.set_input_tensor(2 + decoder_layer_id * 2, m_cache_manager->get_key_cache(decoder_layer_id)); infer_request.set_input_tensor(2 + decoder_layer_id * 2 + 1, m_cache_manager->get_value_cache(decoder_layer_id)); } - SchedulerConfig updated_config = scheduler_config; // update KV number in scheduler config if (scheduler_config.num_kv_blocks != device_config.get_num_kv_blocks()) { updated_config.num_kv_blocks = device_config.get_num_kv_blocks(); } - m_scheduler = std::make_shared(updated_config); // and finally create model runner m_model_runner = std::make_shared(infer_request, updated_config); m_sampler = std::make_shared(); m_sampler->set_seed(m_generation_config.rng_seed); + m_sampler->set_seed(0); // read default generation config } + inline void pull_awaiting_requests() { + if (m_requests.empty()) { + // Pull awaiting requests + if (!m_awaiting_requests.empty()) { + std::lock_guard lock{m_awaiting_requests_mutex}; + m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); + m_awaiting_requests.clear(); + } + } + } + +public: + Impl(const std::string& models_path, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config) : + m_tokenizer{tokenizer} { + ov::Core core; + // The model can be compiled for GPU as well + std::shared_ptr model = read_model_and_apply_paged_attention(models_path, core); + compile_model(model, scheduler_config, plugin_config, device, core); + } + Impl(const std::string& models_path, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& llm_plugin_config, const ov::AnyMap& tokenizer_plugin_config) : Impl{models_path, Tokenizer(models_path, tokenizer_plugin_config), scheduler_config, device, llm_plugin_config} {} + + Impl(ov::Core& core, std::shared_ptr model, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config, bool is_validation_mode = false) : + m_is_validation_mode_enabled(is_validation_mode), + m_tokenizer{tokenizer} { + compile_model(model, scheduler_config, plugin_config, device, core); + } ov::genai::GenerationConfig get_config() const { return m_generation_config; @@ -167,18 +198,15 @@ class ContinuousBatchingPipeline::Impl { static ManualTimer step_timer("step()"); step_timer.start(); - // Pull awaiting requests - { - std::lock_guard lock{m_awaiting_requests_mutex}; - m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); - m_awaiting_requests.clear(); - } + pull_awaiting_requests(); m_pipeline_metrics.requests = m_requests.size(); Scheduler::Output scheduler_output; { static ManualTimer timer("scheduling"); timer.start(); + // todo: iefode: to move to other place? + m_scheduler->clean_empty_blocks(m_requests); scheduler_output = m_scheduler->schedule(m_requests); m_pipeline_metrics.scheduled_requests = scheduler_output.m_scheduled_sequence_groups_ids.size(); m_pipeline_metrics.cache_usage = scheduler_output.m_cache_usage; @@ -221,7 +249,7 @@ class ContinuousBatchingPipeline::Impl { { static ManualTimer timer("sample"); timer.start(); - sampler_output = m_sampler->sample(m_requests, logits); + sampler_output = m_sampler->sample(m_requests, logits, m_is_validation_mode_enabled); timer.end(); } @@ -382,6 +410,113 @@ class ContinuousBatchingPipeline::Impl { m_is_chat_conversation = false; m_history.clear(); }; + + void finish_all_requests() { + while (!m_requests.empty()) { + const auto& request = *m_requests.rbegin(); + for (const auto& sequence : request->get_sequences()) { + m_scheduler->free_sequence(sequence->get_id()); + } + m_sampler->clear_beam_search_info(request->get_request_id()); + m_requests.pop_back(); + } + } + + std::vector get_generated_sequences() { + pull_awaiting_requests(); + std::vector result; + for (const auto& request : m_requests) { + const auto request_id = request->get_request_id(); + for (const auto& sequence : request->get_sequences()) { + auto generated_ids = sequence->get_generated_ids(); + auto log_probs = sequence->get_log_probs(); + result.emplace_back(request_id, sequence->get_grouped_id(), generated_ids, log_probs); + } + } + return result; + + } + + ContinuousBatchingPipeline::UpdateSeqResult + update_generated_sequence(const ContinuousBatchingPipeline::GeneratedSequence& candidate_sequence) { + pull_awaiting_requests(); + bool is_empty_generated_tokens = false; + for (auto& request : m_requests) { + if (candidate_sequence.request_id == request->get_request_id()) { + bool is_seq_exists = false; + // todo: iefode: multiseq + size_t to_remove_tokens = 0, to_insert_tokens = 0; + for (auto& sequence : request->get_sequences()) { + if (candidate_sequence.sequence_id == sequence->get_grouped_id()) { + is_seq_exists = true; + auto present_ids = sequence->get_generated_ids(); + const auto& candidate_ids = candidate_sequence.token_ids; + + // remove extra tokens from sequence + { + auto token_idx = std::min(present_ids.size(), candidate_ids.size()); + if (token_idx) { + while (token_idx-- > 0) { + if (present_ids[token_idx] == candidate_ids[token_idx]) { + break; + } + } + to_remove_tokens = present_ids.size() - (token_idx + 1); + if (to_remove_tokens > 0) { + const auto gen_ids_before = sequence->get_generated_ids(); + sequence->remove_last_n_tokens(to_remove_tokens); + present_ids = sequence->get_generated_ids(); + const size_t gen_len_before = gen_ids_before.size(), + gen_len_after = present_ids.size(); + if (gen_len_after == 0) { + is_empty_generated_tokens = true; + } + OPENVINO_ASSERT(gen_len_after < gen_len_before); + for (size_t i = gen_len_after; i < gen_len_before; ++i) { + m_sampler->update_logit_processor(request->get_request_id(), gen_ids_before[i]); + } + } + } + } + // insert new tokens to sequence + { + OPENVINO_ASSERT(candidate_ids.size() >= present_ids.size()); + const auto& candidate_log_probs = candidate_sequence.log_probs; + const size_t start_id = std::min(present_ids.size(), candidate_ids.size()), + stop_id = std::max(present_ids.size(), candidate_ids.size()); + to_insert_tokens = stop_id - start_id; + for (size_t i = start_id; i < stop_id; ++i) { + sequence->append_token(candidate_ids[i], i < candidate_log_probs.size() ? candidate_log_probs[i] : 0.f); + } + } + } + break; + } + if (!is_seq_exists) { + Sequence::Ptr new_sequence(new Sequence(candidate_sequence.sequence_id)); + const auto& generated_tokens = candidate_sequence.token_ids; + const auto& generated_log_probs = candidate_sequence.log_probs; + for (size_t i = 0; i < generated_tokens.size(); ++i) { + new_sequence->append_token(generated_tokens[i], generated_log_probs[i]); + } + request->add_sequence(new_sequence); + } + if (!is_empty_generated_tokens) { + // in case of non-prompt we need to take prev tokens + token to validate + if (request->get_num_processed_tokens()) + ++to_insert_tokens; + if (to_remove_tokens > 0) { + request->decrease_processed_tokens(to_remove_tokens); + } + // to validate tokens/extend kv-cache before generation + request->set_validation_len(to_insert_tokens); + } else if (to_remove_tokens > 0) { + request->update_processed_tokens_num(request->get_prompt_len()); + } + return ContinuousBatchingPipeline::UpdateSeqResult(to_insert_tokens, to_remove_tokens); + } + } + } }; ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::string& models_path, @@ -400,6 +535,16 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const ov::AnyMap& plugin_config ) : m_impl{std::make_shared(model_path, tokenizer, scheduler_config, device, plugin_config)} {} +ContinuousBatchingPipeline::ContinuousBatchingPipeline( + ov::Core& core, + const std::shared_ptr& model, + const ov::genai::Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config, + bool is_enable_validation_mode +) : m_impl{std::make_shared(core, model, tokenizer, scheduler_config, device, plugin_config, is_enable_validation_mode)} {} + ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() { return m_impl->get_tokenizer(); } @@ -443,3 +588,16 @@ void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { void ContinuousBatchingPipeline::finish_chat() { m_impl->finish_chat(); }; +void ContinuousBatchingPipeline::finish_all_requests() { + m_impl->finish_all_requests(); +} + +std::vector +ContinuousBatchingPipeline::get_generated_sequences() { + return m_impl->get_generated_sequences(); +} + +ContinuousBatchingPipeline::UpdateSeqResult +ContinuousBatchingPipeline::update_generated_sequence(const ContinuousBatchingPipeline::GeneratedSequence& new_sequence) { + return m_impl->update_generated_sequence(new_sequence); +} diff --git a/src/cpp/src/paged_attention_transformations.cpp b/src/cpp/src/paged_attention_transformations.cpp index 3f343048ea..57d8c655fd 100644 --- a/src/cpp/src/paged_attention_transformations.cpp +++ b/src/cpp/src/paged_attention_transformations.cpp @@ -16,12 +16,13 @@ inline ov::PartialShape to_partial_with_dyn_0_dim(const ov::Shape& static_shape) return partial_shape; } -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config) { +void apply_paged_attention_transformations(std::shared_ptr model) { const ov::op::util::VariableVector& variables = model->get_variables(); OPENVINO_ASSERT(!variables.empty(), "Model is supposed to be stateful"); - ov::pass::SDPAToPagedAttention().run_on_model(model); +} +void set_type_and_shape_to_kv_cache(std::shared_ptr model, DeviceConfig& device_config) { const ov::ParameterVector& parameters = model->get_parameters(); size_t num_layers = std::count_if(parameters.begin(), parameters.end(), [](std::shared_ptr parameter) { diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index caf8cd2cc4..0384102667 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -64,7 +64,7 @@ class Scheduler { void clean_empty_blocks(std::vector& seq_groups) { for (const auto& seq_group : seq_groups) - m_block_manager.free_group_partially(seq_group, m_block_manager.num_free_blocks()); + m_block_manager.free_empty_physical_blocks(seq_group); } const std::vector& get_block_table(const Sequence& seq) { From 611c8bbda4ab5b548a063aea372395607f32650a Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Fri, 6 Sep 2024 20:01:43 +0400 Subject: [PATCH 3/4] remove debug prints --- ...ntinuous_batching_speculative_decoding.cpp | 11 -- .../speculative_decoding_pipeline.cpp | 113 ++++++------------ .../speculative_decoding_pipeline.hpp | 9 +- .../cpp/greedy_causal_lm/greedy_causal_lm.cpp | 12 +- .../speculative_decoding_lm.cpp | 83 +++++++++++-- .../genai/continuous_batching_pipeline.hpp | 2 +- src/cpp/src/continuous_batching_pipeline.cpp | 36 ++++-- src/cpp/src/generation_config.cpp | 6 +- src/cpp/src/sampler.hpp | 19 +-- src/cpp/src/sequence_group.hpp | 5 + 10 files changed, 166 insertions(+), 130 deletions(-) diff --git a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp index f5589bfcdc..f66797e4d6 100644 --- a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp +++ b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp @@ -91,7 +91,6 @@ int main(int argc, char* argv[]) try { // It's possible to construct a Tokenizer from a different path. // If the Tokenizer isn't specified, it's loaded from the same folder. SpeculativeDecodingPipeline pipe(models_path, assisting_models_path, k, scheduler_config, "CPU"); - auto start_time = std::chrono::system_clock::now(); std::vector generation_results = pipe.generate(prompts, sampling_params); for (size_t request_id = 0; request_id < generation_results.size(); ++request_id) { @@ -121,16 +120,6 @@ int main(int argc, char* argv[]) try { } std::cout << std::endl; } - auto end_time = std::chrono::system_clock::now(); - std::chrono::duration duration = end_time - start_time; - std::cout << std::endl; - std::cout << "Duration: " << duration.count() << std::endl; - std::cout << "Infer number: " << pipe.m_matches_info.size() << std::endl; - std::cout << "MAX matches number: " << pipe.m_max_matches << std::endl; - auto a = std::accumulate(pipe.m_matches_info.begin(), pipe.m_matches_info.end(), 0); - std::cout << "AVG matches number: " << (float(a) / pipe.m_matches_info.size()) << std::endl; - double c = double(pipe.m_speculative_model_duration) * 100 / std::chrono::duration_cast(duration).count(); - std::cout << "Speculative model time duration: " << pipe.m_speculative_model_duration * 1e-9 << " in %: " << c << std::endl; } catch (const std::exception& error) { std::cerr << error.what() << '\n'; return EXIT_FAILURE; diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp index 9c4b96da06..c9dab81f12 100644 --- a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp @@ -33,6 +33,7 @@ SpeculativeDecodingPipeline::SpeculativeDecodingPipeline( ov::genai::SchedulerConfig model_scheduler_config = scheduler_config, assisting_scheduler_config = scheduler_config; if (m_is_speculative_mode) { + // split KV cache to 2 caches for speculative and base parts size_t model_cache_size = get_kv_cache_size(model), assisting_cache_size = get_kv_cache_size(assisting_model); auto k = float(assisting_cache_size) / (model_cache_size + assisting_cache_size); @@ -45,38 +46,25 @@ SpeculativeDecodingPipeline::SpeculativeDecodingPipeline( } m_pipeline = ov::genai::ContinuousBatchingPipeline(core, model, m_tokenizer, model_scheduler_config, device, plugin_config, true); - } void SpeculativeDecodingPipeline::step() { - // std::cout << "=======STEP==================" << std::endl; std::vector candidate_sequences; if (m_is_speculative_mode) { - // generate candidates using small model - // std::cout << "num_candidates: " << candidates_number << std::endl; - for (size_t i = 0; i < m_candidates_num; ++i) { - auto start_time = std::chrono::system_clock::now(); + // find minimum(candidates_number, seq_len) to generate candidates + size_t min_candidates_number = m_candidates_num; + for (auto& request : m_to_generate_length) { + if (request.second < min_candidates_number && request.second > 0) { + min_candidates_number = request.second; + } + } + // generate candidates by speculative model + for (size_t i = 0; i < min_candidates_number; ++i) { m_speculative_pipeline.step(); - auto end_time = std::chrono::system_clock::now(); - m_speculative_model_duration += std::chrono::duration_cast(end_time - start_time).count(); } - // put candidates to model cache + // put candidates to model KV cache candidate_sequences = m_speculative_pipeline.get_generated_sequences(); - // todo: remove debug code - // for (const auto& s : candidate_sequences) { - // std::cout << "ASSISTANT: "; - // for (const auto& d : s.token_ids) { - // std::cout << d << " "; - // } - // std::cout << std::endl; - // for (const auto& d : s.log_probs) { - // std::cout << d << " "; - // } - // std::cout << std::endl; - // std::cout << decode(s.token_ids) << std::endl; - // } - for (const auto& candidate : candidate_sequences) { m_pipeline.update_generated_sequence(candidate); } @@ -86,34 +74,25 @@ void SpeculativeDecodingPipeline::step() { m_pipeline.step(); if (m_is_speculative_mode) { - // todo: iefode: remove debug prints auto checked_sequences = m_pipeline.get_generated_sequences(); - // todo: remove debug code - // for (const auto& s : checked_sequences) { - // std::cout << "MODEL: "; - // for (const auto& d : s.token_ids) { - // std::cout << d << " "; - // } - // std::cout << std::endl; - // for (const auto& d : s.log_probs) { - // std::cout << d << " "; - // } - // std::cout << std::endl; - // std::cout << decode(s.token_ids) << std::endl; - // std::cout << std::endl; - // } - - ov::genai::ContinuousBatchingPipeline::UpdateSeqResult update_result; + size_t max_removed_token_cnt = 0; for (const auto& checked_sequence : checked_sequences) { - update_result = m_speculative_pipeline.update_generated_sequence(checked_sequence); + auto update_result = m_speculative_pipeline.update_generated_sequence(checked_sequence); + max_removed_token_cnt = std::max(max_removed_token_cnt, update_result.to_remove); + } + OPENVINO_ASSERT(m_candidates_num >= max_removed_token_cnt); + auto num_matches = m_candidates_num - max_removed_token_cnt; + update_strategy(num_matches); + + // update to generate tokens + for (auto& request : m_to_generate_length) { + if (request.second > num_matches) { + request.second -= (num_matches + 1); + } else { + request.second = 0; + m_speculative_pipeline.finish_request(request.first); + } } - - OPENVINO_ASSERT(m_candidates_num >= update_result.to_remove); - // if (update_result.to_remove) { - // std::cout << "to_remove: " << update_result.to_remove << std::endl; - // } - update_strategy(m_candidates_num - update_result.to_remove); - // std::cout << "=========================" << std::endl; } } @@ -126,6 +105,7 @@ SpeculativeDecodingPipeline::generate(const std::vector& prompts, std::vector generations, speculative_generations; for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { generations.push_back(m_pipeline.add_request(request_id, prompts[request_id], sampling_params[request_id])); + m_to_generate_length.insert({ request_id, sampling_params[request_id].max_new_tokens }); if (m_is_speculative_mode) { auto assisting_sampling_params = sampling_params[request_id]; assisting_sampling_params.max_new_tokens += m_max_candidates_num; @@ -138,7 +118,8 @@ SpeculativeDecodingPipeline::generate(const std::vector& prompts, step(); } if (m_is_speculative_mode) { - m_speculative_pipeline.finish_all_requests(); + // finish all speculative requests + m_speculative_pipeline.finish_request(-1); } std::vector encoded_results; @@ -180,38 +161,16 @@ SpeculativeDecodingPipeline::generate(const std::vector& prompts, return decoded_results; } -inline size_t get_median(std::vector values) { - const auto size = values.size(); - if (size == 0) { - return 0; - } - size_t offset = values.size() / 2; - - auto it = values.begin() + offset; - std::nth_element(values.begin(), it, values.end()); - - if (size % 2 != 0) { - return *it; - } - auto it_1 = values.begin() + offset - 1; - std::nth_element(values.begin(), it_1, values.end()); - return (*it + *it_1) / 2; -} - - -void SpeculativeDecodingPipeline::update_strategy(size_t num_matches) { - // std::cout << "num_matches: " << num_matches << " m_candidates_num: " << m_candidates_num << std::endl; +void SpeculativeDecodingPipeline::update_strategy(const size_t num_matches) { + // dynamically adjust number of generated candidates based on number of matches + // we want to balance the benefits of getting candidates tokens correct with the + // cost of forecasting incorrect candidates tokens. if (m_max_candidates_num == 0) { return; } - - if (m_max_matches < num_matches) { - m_max_matches = num_matches; - } if (num_matches == m_candidates_num) { - m_candidates_num = std::min(std::max(m_candidates_num + 1, m_max_matches), m_max_candidates_num); + m_candidates_num = std::min(m_candidates_num + 2, m_max_candidates_num); } else { - m_candidates_num = num_matches > 0 ? num_matches : std::max(get_median(m_matches_info), size_t(1)); + m_candidates_num = std::max(int64_t(m_candidates_num) - 1, int64_t(1)); } - m_matches_info.push_back(num_matches); } \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp index 2100c3bdd0..4f03b80aa2 100644 --- a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp @@ -16,6 +16,7 @@ class SpeculativeDecodingPipeline { bool m_is_speculative_mode = false; size_t m_max_candidates_num = 0, m_candidates_num = 0; + std::map m_to_generate_length; void update_strategy(size_t num_matches); @@ -28,10 +29,6 @@ class SpeculativeDecodingPipeline { const ov::AnyMap& plugin_config = {}); void step(); - - std::vector generate(const std::vector& prompts, const std::vector& sampling_params); - - size_t m_max_matches = 0; - std::vector m_matches_info; - int64_t m_speculative_model_duration = 0; + std::vector + generate(const std::vector& prompts, const std::vector& sampling_params); }; \ No newline at end of file diff --git a/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp b/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp index 09e6af65e8..158f6fa577 100644 --- a/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp +++ b/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp @@ -5,17 +5,25 @@ int main(int argc, char* argv[]) try { if (3 > argc) - throw std::runtime_error(std::string{"Usage: "} + argv[0] + " \"\""); + throw std::runtime_error(std::string{"Usage: "} + argv[0] + " \"\" "); std::string model_path = argv[1]; std::string prompt = argv[2]; + std::string len = argv[3]; std::string device = "CPU"; // GPU can be used as well ov::genai::LLMPipeline pipe(model_path, device); ov::genai::GenerationConfig config; - config.max_new_tokens = 100; + config.max_new_tokens = std::stoi(len); + auto start_time = std::chrono::system_clock::now(); std::string result = pipe.generate(prompt, config); + auto end_time = std::chrono::system_clock::now(); std::cout << result << std::endl; + + std::chrono::duration duration = end_time - start_time; + std::cout << std::endl; + std::cout << "Duration: " << duration.count() << std::endl; + std::cout << "Infer number: " << config.max_new_tokens << std::endl; } catch (const std::exception& error) { try { std::cerr << error.what() << '\n'; diff --git a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp index de2d2f8837..83eb74fe6d 100644 --- a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp +++ b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp @@ -147,6 +147,10 @@ class AssistedCandidateGenerator { size_t draft_model_seq_length = 0; public: + size_t max_match = 0, avg_match = 0; + int64_t m_speculative_model_duration = 0; + std::vector m_matches_info; + AssistedCandidateGenerator(ov::InferRequest draft_model, const size_t max_seq_length, const size_t num_pred_tokens, @@ -174,8 +178,10 @@ class AssistedCandidateGenerator { draft_model.get_tensor("beam_idx").set_shape({BATCH_SIZE}); draft_model.get_tensor("beam_idx").data()[0] = 0; - + auto start_time = std::chrono::system_clock::now(); draft_model.infer(); + auto end_time = std::chrono::system_clock::now(); + m_speculative_model_duration += std::chrono::duration_cast(end_time - start_time).count(); auto logits = draft_model.get_tensor("logits"); size_t vocab_size = logits.get_shape().back(); @@ -220,8 +226,49 @@ class AssistedCandidateGenerator { } else { num_pred_tokens = std::max(int64_t(num_pred_tokens) - 1, int64_t(1)); } + // std::cout << "num_matches: " << num_matches << " m_candidates_num: " << num_pred_tokens << std::endl; + max_match = std::max(num_matches, max_match); + avg_match += num_matches; } + +// inline size_t get_median(std::vector values) { +// const auto size = values.size(); +// if (size == 0) { +// return 0; +// } +// size_t offset = values.size() / 2; + +// auto it = values.begin() + offset; +// std::nth_element(values.begin(), it, values.end()); + +// if (size % 2 != 0) { +// return *it; +// } +// auto it_1 = values.begin() + offset - 1; +// std::nth_element(values.begin(), it_1, values.end()); +// return (*it + *it_1) / 2; +// } + + +// void update_candidate_strategy(size_t num_matches) { +// // std::cout << "num_matches: " << num_matches << " m_candidates_num: " << m_candidates_num << std::endl; +// if (max_pred_tokens == 0) { +// return; +// } + +// if (max_match < num_matches) { +// max_match = num_matches; +// } +// if (num_matches == num_pred_tokens) { +// num_pred_tokens = std::min(std::max(num_pred_tokens + 1, max_match), max_pred_tokens); +// } else { +// num_pred_tokens = num_matches > 0 ? num_matches : std::max(get_median(m_matches_info), size_t(1)); +// } +// m_matches_info.push_back(num_matches); +// avg_match += num_matches; +// } + void update_kv_cache(const size_t seq_length) { // this is the case when main model accepted all candidates from draft model // we need to collect kv cache for out_of_kv_cache_token by infering it @@ -250,8 +297,8 @@ int64_t get_eos_token(const std::shared_ptr tokenizer) { } // namespace int main(int argc, char* argv[]) try { - if (argc != 4) { - throw std::runtime_error(std::string{"Usage: "} + argv[0] + "
''"); + if (argc != 4 + 2) { + throw std::runtime_error(std::string{"Usage: "} + argv[0] + "
'' "); } // tokenizer model @@ -281,9 +328,9 @@ int main(int argc, char* argv[]) try { ov::InferRequest main_model = core.compile_model(ov_main_model, "CPU").create_infer_request(); - size_t max_sequence_length = 100; + size_t max_sequence_length = std::stoi(std::string{argv[5]}); - AssistedCandidateGenerator candidateGenerator{draft_model, max_sequence_length, 5, draft_model_seq_len_axis}; + AssistedCandidateGenerator candidateGenerator{draft_model, max_sequence_length, std::stoi(std::string{argv[4]}), draft_model_seq_len_axis}; main_model.set_tensor("input_ids", input_ids); main_model.set_tensor("attention_mask", attention_mask); @@ -296,11 +343,15 @@ int main(int argc, char* argv[]) try { main_model.get_tensor("beam_idx").set_shape({BATCH_SIZE}); main_model.get_tensor("beam_idx").data()[0] = 0; + auto start_time = std::chrono::system_clock::now(); + size_t iteration_cnt = 0; + // To coollect kv-cache for the and to get the next token run the very first infer request candidateGenerator.generate_next_token( std::vector(input_ids.data(), input_ids.data() + input_ids.get_size())); main_model.infer(); + ++iteration_cnt; size_t vocab_size = draft_model.get_tensor("logits").get_shape().back(); OPENVINO_ASSERT(vocab_size == main_model.get_tensor("logits").get_shape().back(), @@ -332,6 +383,10 @@ int main(int argc, char* argv[]) try { while (out_token != EOS_TOKEN && seq_len < max_sequence_length) { // generate candidates from the draft model std::vector candidates = candidateGenerator.generate_candidates(out_token); + // std::cout << "ASIISTING MODEL: " << std::endl; + // for (size_t i = 0; i < candidates.size(); ++i) { + // std::cout << "N max_sampled_tokens: " << candidates[i] << std::endl; + // } size_t candidates_size = candidates.size(); // For the main network, candidates_size + 1 tokens will be fed at once in a single infer request. @@ -349,6 +404,7 @@ int main(int argc, char* argv[]) try { std::iota(position_ids.data(), position_ids.data() + position_ids.get_size(), seq_len); main_model.infer(); + ++iteration_cnt; data_logits = logits.data(); // [BATCH_SIZE, K, vocab_size] @@ -359,18 +415,19 @@ int main(int argc, char* argv[]) try { // 2.2 it it's mismatch, stop iteration but still accept current token as it was last token generated by // model from a valid sequence. size_t accepted_tokens_number = 0; + // std::cout << "MODEL: " << std::endl; for (size_t i = 0; i < candidates_size + 1; i++) { auto start = data_logits + vocab_size * i; auto stop = data_logits + vocab_size * (i + 1); out_token = std::max_element(start, stop) - start; + // std::cout << "N max_sampled_tokens: " << out_token << std::endl; if (out_token == EOS_TOKEN) { break; } - text_streamer.put(out_token); accepted_tokens_number++; - + text_streamer.put(out_token); if (i == candidates_size || out_token != candidates[i]) { break; } @@ -384,6 +441,7 @@ int main(int argc, char* argv[]) try { if (accepted_tokens_number > 0) { candidateGenerator.update_candidate_strategy(accepted_tokens_number - 1); } + // std::cout << "=========================" << std::endl; candidateGenerator.update_kv_cache(seq_len); update_kv_cache(main_model, main_model_seq_len_axis, seq_len); @@ -398,6 +456,17 @@ int main(int argc, char* argv[]) try { // it is called for education purposes: draft_model.reset_state(); main_model.reset_state(); + + auto end_time = std::chrono::system_clock::now(); + std::chrono::duration duration = end_time - start_time; + std::cout << std::endl; + std::cout << "Duration: " << duration.count() << std::endl; + std::cout << "Infer number: " << iteration_cnt << std::endl; + std::cout << "MAX matches number: " << candidateGenerator.max_match << std::endl; + // auto a = std::accumulate(pipe.m_matches_info.begin(), pipe.m_matches_info.end(), 0); + std::cout << "AVG matches number: " << float(candidateGenerator.avg_match) / iteration_cnt << std::endl; + double c = double(candidateGenerator.m_speculative_model_duration) * 100 / std::chrono::duration_cast(duration).count(); + std::cout << "Speculative model time duration in %: " << c << std::endl; } catch (const std::exception& error) { try { std::cerr << error.what() << '\n'; diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index c0644d065f..2e8cc53ca6 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -94,7 +94,7 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { void finish_chat(); // for speculative decoding - void finish_all_requests(); + void finish_request(int64_t request_id = -1); struct GeneratedSequence { uint64_t request_id = 0, sequence_id = 0; diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 30ea652724..a6bce7a7bd 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -411,14 +411,29 @@ class ContinuousBatchingPipeline::Impl { m_history.clear(); }; - void finish_all_requests() { - while (!m_requests.empty()) { - const auto& request = *m_requests.rbegin(); - for (const auto& sequence : request->get_sequences()) { - m_scheduler->free_sequence(sequence->get_id()); + void finish_request(int64_t request_id) { + if (request_id == -1) { + while (!m_requests.empty()) { + const auto& request = *m_requests.rbegin(); + for (const auto& sequence : request->get_sequences()) { + m_scheduler->free_sequence(sequence->get_id()); + } + m_sampler->clear_beam_search_info(request->get_request_id()); + m_requests.pop_back(); + } + } else { + for (size_t i = 0; i < m_requests.size(); ++i) { + auto& request = m_requests[i]; + if (request->get_request_id() != request_id) { + continue; + } + for (const auto& sequence : request->get_sequences()) { + m_scheduler->free_sequence(sequence->get_id()); + } + m_sampler->clear_beam_search_info(request->get_request_id()); + m_requests.erase(m_requests.begin() + i); + break; } - m_sampler->clear_beam_search_info(request->get_request_id()); - m_requests.pop_back(); } } @@ -467,7 +482,7 @@ class ContinuousBatchingPipeline::Impl { sequence->remove_last_n_tokens(to_remove_tokens); present_ids = sequence->get_generated_ids(); const size_t gen_len_before = gen_ids_before.size(), - gen_len_after = present_ids.size(); + gen_len_after = present_ids.size(); if (gen_len_after == 0) { is_empty_generated_tokens = true; } @@ -516,6 +531,7 @@ class ContinuousBatchingPipeline::Impl { return ContinuousBatchingPipeline::UpdateSeqResult(to_insert_tokens, to_remove_tokens); } } + return {0, 0}; } }; @@ -588,8 +604,8 @@ void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { void ContinuousBatchingPipeline::finish_chat() { m_impl->finish_chat(); }; -void ContinuousBatchingPipeline::finish_all_requests() { - m_impl->finish_all_requests(); +void ContinuousBatchingPipeline::finish_request(int64_t request_id) { + m_impl->finish_request(request_id); } std::vector diff --git a/src/cpp/src/generation_config.cpp b/src/cpp/src/generation_config.cpp index f84600eb33..d05a62809c 100644 --- a/src/cpp/src/generation_config.cpp +++ b/src/cpp/src/generation_config.cpp @@ -161,9 +161,9 @@ GenerationConfig greedy() { greedy_config.temperature = 0.0f; greedy_config.ignore_eos = true; greedy_config.num_return_sequences = 1; - greedy_config.repetition_penalty = 3.0f; - greedy_config.presence_penalty = 0.1f; - greedy_config.frequency_penalty = 0.01f; + // greedy_config.repetition_penalty = 3.0f; + // greedy_config.presence_penalty = 0.1f; + // greedy_config.frequency_penalty = 0.01f; greedy_config.max_new_tokens = 30; return greedy_config; } diff --git a/src/cpp/src/sampler.hpp b/src/cpp/src/sampler.hpp index d39b59835c..9d62cd24b6 100644 --- a/src/cpp/src/sampler.hpp +++ b/src/cpp/src/sampler.hpp @@ -229,15 +229,6 @@ class Sampler { max_index = i; } } - // todo: remove: to print: - // std::vector logit_vector_copy(logits.m_size); - // logit_vector_copy = logits.m_size.m_data; - // std::sort(logit_vector_copy.begin(), logit_vector_copy.end(), [](Token& lhs, Token& rhs){ return lhs.m_log_prob > rhs.m_log_prob; }); - // std::cout << "N max_sampled_tokens: "; - // for (size_t i = 0; i < 5; ++i) { - // std::cout << logit_vector_copy[i].m_index << " "; - // } - // std::cout << std::endl; return Token(logits.m_data[max_index], max_index); } @@ -328,11 +319,14 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, auto register_new_token = [&](const Token& sampled_token_id, size_t running_sequence_id, bool is_extend_sequence = true) { logit_processor.register_new_generated_token(sampled_token_id.m_index); // increment seq len only for one sequence in sequence group to sync them - if (running_sequence_id == num_running_sequences - 1) { - logit_processor.increment_gen_tokens(); - } if (is_extend_sequence) { running_sequences[running_sequence_id]->append_token(sampled_token_id.m_index, sampled_token_id.m_log_prob); + } else { + OPENVINO_ASSERT(logit_processor.get_gen_token_len() < running_sequences[running_sequence_id]->get_generated_len()); + running_sequences[running_sequence_id]->update_log_prob(sampled_token_id.m_log_prob, logit_processor.get_gen_token_len()); + } + if (running_sequence_id == num_running_sequences - 1) { + logit_processor.increment_gen_tokens(); } }; for (size_t running_sequence_id = 0; running_sequence_id < num_running_sequences; ++running_sequence_id) { @@ -390,7 +384,6 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, sampled_token_id.m_index = *it; } } - register_new_token(sampled_token_id, running_sequence_id, is_extend_sequence); if (token_id_per_seq == 0) { break; diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 55c1a154aa..6d80b74796 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -146,6 +146,11 @@ class Sequence { return m_generated_probs; } + void update_log_prob(float new_log_prob, size_t idx) { + OPENVINO_ASSERT(idx <= m_generated_probs.size()); + m_generated_probs[idx] = new_log_prob; + } + float get_cumulative_log_probs() const { return m_cumulative_log_prob; } From 6acf230217144b3c537311b9f48fd40beda29ca0 Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Fri, 6 Sep 2024 21:21:28 +0400 Subject: [PATCH 4/4] remove extra --- ...ntinuous_batching_speculative_decoding.cpp | 1 + .../speculative_decoding_lm.cpp | 66 ------------------- 2 files changed, 1 insertion(+), 66 deletions(-) diff --git a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp index f66797e4d6..782003d789 100644 --- a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp +++ b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp @@ -51,6 +51,7 @@ int main(int argc, char* argv[]) try { // create dataset std::vector prompt_examples = { + "The United Arab Emirates[c] (UAE), or simply the Emirates,[d] is a country in West Asia, in the Middle East, at the eastern end of the Arabian Peninsula. It is a federal, elective monarchy composed of seven emirates, with Abu Dhabi as its capital.[13] It shares land borders with Oman to the east and northwest, and with Saudi Arabia to the southwest; as well as maritime borders in the Persian Gulf with Qatar and Iran, and with Oman in the Gulf of Oman.", "What is OpenVINO?", "How are you?", "What is your name?", diff --git a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp index 66c7c231fa..f535ebb751 100644 --- a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp +++ b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp @@ -147,10 +147,6 @@ class AssistedCandidateGenerator { size_t draft_model_seq_length = 0; public: - size_t max_match = 0, avg_match = 0; - int64_t m_speculative_model_duration = 0; - std::vector m_matches_info; - AssistedCandidateGenerator(ov::InferRequest draft_model, const size_t max_seq_length, const size_t num_pred_tokens, @@ -178,10 +174,7 @@ class AssistedCandidateGenerator { draft_model.get_tensor("beam_idx").set_shape({BATCH_SIZE}); draft_model.get_tensor("beam_idx").data()[0] = 0; - auto start_time = std::chrono::system_clock::now(); draft_model.infer(); - auto end_time = std::chrono::system_clock::now(); - m_speculative_model_duration += std::chrono::duration_cast(end_time - start_time).count(); auto logits = draft_model.get_tensor("logits"); size_t vocab_size = logits.get_shape().back(); @@ -226,49 +219,8 @@ class AssistedCandidateGenerator { } else { num_pred_tokens = std::max(int64_t(num_pred_tokens) - 1, int64_t(1)); } - // std::cout << "num_matches: " << num_matches << " m_candidates_num: " << num_pred_tokens << std::endl; - max_match = std::max(num_matches, max_match); - avg_match += num_matches; } - -// inline size_t get_median(std::vector values) { -// const auto size = values.size(); -// if (size == 0) { -// return 0; -// } -// size_t offset = values.size() / 2; - -// auto it = values.begin() + offset; -// std::nth_element(values.begin(), it, values.end()); - -// if (size % 2 != 0) { -// return *it; -// } -// auto it_1 = values.begin() + offset - 1; -// std::nth_element(values.begin(), it_1, values.end()); -// return (*it + *it_1) / 2; -// } - - -// void update_candidate_strategy(size_t num_matches) { -// // std::cout << "num_matches: " << num_matches << " m_candidates_num: " << m_candidates_num << std::endl; -// if (max_pred_tokens == 0) { -// return; -// } - -// if (max_match < num_matches) { -// max_match = num_matches; -// } -// if (num_matches == num_pred_tokens) { -// num_pred_tokens = std::min(std::max(num_pred_tokens + 1, max_match), max_pred_tokens); -// } else { -// num_pred_tokens = num_matches > 0 ? num_matches : std::max(get_median(m_matches_info), size_t(1)); -// } -// m_matches_info.push_back(num_matches); -// avg_match += num_matches; -// } - void update_kv_cache(const size_t seq_length) { // this is the case when main model accepted all candidates from draft model // we need to collect kv cache for out_of_kv_cache_token by infering it @@ -383,10 +335,6 @@ int main(int argc, char* argv[]) try { while (out_token != EOS_TOKEN && seq_len < max_sequence_length) { // generate candidates from the draft model std::vector candidates = candidateGenerator.generate_candidates(out_token); - // std::cout << "ASIISTING MODEL: " << std::endl; - // for (size_t i = 0; i < candidates.size(); ++i) { - // std::cout << "N max_sampled_tokens: " << candidates[i] << std::endl; - // } size_t candidates_size = candidates.size(); // For the main network, candidates_size + 1 tokens will be fed at once in a single infer request. @@ -415,13 +363,11 @@ int main(int argc, char* argv[]) try { // 2.2 it it's mismatch, stop iteration but still accept current token as it was last token generated by // model from a valid sequence. size_t accepted_tokens_number = 0; - // std::cout << "MODEL: " << std::endl; for (size_t i = 0; i < candidates_size + 1; i++) { auto start = data_logits + vocab_size * i; auto stop = data_logits + vocab_size * (i + 1); out_token = std::max_element(start, stop) - start; - // std::cout << "N max_sampled_tokens: " << out_token << std::endl; if (out_token == EOS_TOKEN) { break; } @@ -441,7 +387,6 @@ int main(int argc, char* argv[]) try { if (accepted_tokens_number > 0) { candidateGenerator.update_candidate_strategy(accepted_tokens_number - 1); } - // std::cout << "=========================" << std::endl; candidateGenerator.update_kv_cache(seq_len); update_kv_cache(main_model, main_model_seq_len_axis, seq_len); @@ -456,17 +401,6 @@ int main(int argc, char* argv[]) try { // it is called for education purposes: draft_model.reset_state(); main_model.reset_state(); - - auto end_time = std::chrono::system_clock::now(); - std::chrono::duration duration = end_time - start_time; - std::cout << std::endl; - std::cout << "Duration: " << duration.count() << std::endl; - std::cout << "Infer number: " << iteration_cnt << std::endl; - std::cout << "MAX matches number: " << candidateGenerator.max_match << std::endl; - // auto a = std::accumulate(pipe.m_matches_info.begin(), pipe.m_matches_info.end(), 0); - std::cout << "AVG matches number: " << float(candidateGenerator.avg_match) / iteration_cnt << std::endl; - double c = double(candidateGenerator.m_speculative_model_duration) * 100 / std::chrono::duration_cast(duration).count(); - std::cout << "Speculative model time duration in %: " << c << std::endl; } catch (const std::exception& error) { try { std::cerr << error.what() << '\n';