diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index a25ace7558..9fc0642275 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -6,7 +6,9 @@ add_subdirectory(cpp/beam_search_causal_lm) add_subdirectory(cpp/chat_sample) add_subdirectory(cpp/continuous_batching_accuracy) add_subdirectory(cpp/continuous_batching_benchmark) -# add_subdirectory(cpp/continuous_batching_speculative_decoding) +# todo: iefode +# add_subdirectory(cpp/continuous_batching_prompt_lookup) +add_subdirectory(cpp/continuous_batching_speculative_decoding) add_subdirectory(cpp/greedy_causal_lm) add_subdirectory(cpp/multinomial_causal_lm) add_subdirectory(cpp/prompt_lookup_decoding_lm) diff --git a/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt b/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt new file mode 100644 index 0000000000..b122782a2e --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# start of dependencies + +include(FetchContent) + +FetchContent_Declare(cxxopts + URL https://github.com/jarro2783/cxxopts/archive/refs/tags/v3.1.1.tar.gz + URL_HASH SHA256=523175f792eb0ff04f9e653c90746c12655f10cb70f1d5e6d6d9491420298a08) + +FetchContent_Declare(nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v3.11.3.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406) + +FetchContent_MakeAvailable(cxxopts) +FetchContent_MakeAvailable(nlohmann_json) + +find_package(OpenVINO REQUIRED COMPONENTS Runtime) + +# end of dependencies + +set(TARGET_NAME continuous_batching_prompt_lookup) +add_executable(${TARGET_NAME} ${TARGET_NAME}.cpp "prompt_lookup_pipeline.hpp" "prompt_lookup_pipeline.cpp") +target_link_libraries(${TARGET_NAME} PRIVATE openvino::genai cxxopts::cxxopts) \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp b/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp new file mode 100644 index 0000000000..3219d2e74f --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp @@ -0,0 +1,138 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "openvino/genai/generation_config.hpp" + +#include "prompt_lookup_pipeline.hpp" + +void print_generation_result(const ov::genai::GenerationResult& generation_result) { + for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { + std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl; + } +} + +int main(int argc, char* argv[]) try { + // Command line options + + cxxopts::Options options("accuracy_sample", "Help command"); + + options.add_options() + ("n,num_prompts", "A number of prompts", cxxopts::value()->default_value("1")) + ("dynamic_split_fuse", "Whether to use dynamic split-fuse or vLLM scheduling", cxxopts::value()->default_value("false")) + ("m,model", "Path to model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("k,candidates_number", "candidates_number", cxxopts::value()->default_value("5")) + ("ngram", "Ngram", cxxopts::value()->default_value("5")) + ("g,generated_len", "generated_len", cxxopts::value()->default_value("30")) + ("h,help", "Print usage"); + + cxxopts::ParseResult result; + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cout << e.what() << "\n\n"; + std::cout << options.help() << std::endl; + return EXIT_FAILURE; + } + + if (result.count("help")) { + std::cout << options.help() << std::endl; + return EXIT_SUCCESS; + } + + const size_t num_prompts = result["num_prompts"].as(); + const bool dynamic_split_fuse = result["dynamic_split_fuse"].as(); + const std::string models_path = result["model"].as(); + const size_t k = result["candidates_number"].as(); + const size_t g = result["generated_len"].as(); + const size_t n = result["ngram"].as(); + + // create dataset + + std::vector prompt_examples = { + // "What is OpenVINO?", + // "How are you?", + "code: ```for (const auto& a : b) { std::cout << a << std::endl; }```", + "Tell me something about Canada", + "What is OpenVINO?", + }; + + auto greedy = ov::genai::greedy(); + greedy.max_new_tokens = g; + + std::vector sampling_params_examples { + // ov::genai::beam_search(), + greedy, + // ov::genai::multinomial(), + }; + + std::vector prompts(num_prompts); + std::vector sampling_params(num_prompts); + + for (size_t request_id = 0; request_id < num_prompts; ++request_id) { + prompts[request_id] = prompt_examples[request_id % prompt_examples.size()]; + sampling_params[request_id] = sampling_params_examples[request_id % sampling_params_examples.size()]; + } + + // Perform the inference + + ov::genai::SchedulerConfig scheduler_config; + // batch size + scheduler_config.max_num_batched_tokens = 256; + // cache params + scheduler_config.num_kv_blocks = 364; + scheduler_config.block_size = 32; + // mode - vLLM or dynamic_split_fuse + scheduler_config.dynamic_split_fuse = dynamic_split_fuse; + // vLLM specific params + scheduler_config.max_num_seqs = 2; + + // It's possible to construct a Tokenizer from a different path. + // If the Tokenizer isn't specified, it's loaded from the same folder. + PromptLookupPipeline pipe(models_path, k, n, ov::genai::Tokenizer{models_path}, scheduler_config, "CPU"); + auto start_time = std::chrono::system_clock::now(); + std::vector generation_results = pipe.generate(prompts, sampling_params); + + for (size_t request_id = 0; request_id < generation_results.size(); ++request_id) { + const ov::genai::GenerationResult & generation_result = generation_results[request_id]; + std::cout << "Question: " << prompts[request_id] << std::endl; + switch (generation_result.m_status) + { + case ov::genai::GenerationStatus::FINISHED: + print_generation_result(generation_result); + break; + case ov::genai::GenerationStatus::IGNORED: + std::cout << "Request was ignored due to lack of memory." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + std::cout << "Request was aborted." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + default: + break; + } + std::cout << std::endl; + } + auto end_time = std::chrono::system_clock::now(); + std::chrono::duration duration = end_time - start_time; + std::cout << std::endl; + std::cout << "Duration: " << duration.count() << std::endl; + std::cout << "Infer number: " << pipe.infer_cnt << std::endl; + std::cout << "MAX matches number: " << pipe.max_matches << std::endl; + std::cout << "AVG matches number: " << (float(pipe.avg_matches) / pipe.infer_cnt) << std::endl; +} catch (const std::exception& error) { + std::cerr << error.what() << '\n'; + return EXIT_FAILURE; +} catch (...) { + std::cerr << "Non-exception object thrown\n"; + return EXIT_FAILURE; +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp new file mode 100644 index 0000000000..af95d62f4b --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp @@ -0,0 +1,189 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "prompt_lookup_pipeline.hpp" + +PromptLookupPipeline::PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t ngram_size, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + ov::genai::Tokenizer tokenizer(models_path); + PromptLookupPipeline(models_path, candidates_number, max_ngram_size, tokenizer, scheduler_config, device, plugin_config); +}; + +PromptLookupPipeline::PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t ngram_size, + const ov::genai::Tokenizer& tokenizer, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + m_tokenizer = tokenizer; + set_k(candidates_number); + max_ngram_size = ngram_size; + + model_pipeline = ov::genai::ContinuousBatchingPipeline(models_path, m_tokenizer, scheduler_config, device, plugin_config); + model_pipeline.enable_validation_mode(); +} + +ov::genai::PipelineMetrics PromptLookupPipeline::get_metrics() const { + return model_pipeline.get_metrics(); +} + +void PromptLookupPipeline::step() { + std::cout << "=======STEP==================" << std::endl; + bool is_updated = false; + if (is_speculative_mode) { + // predict tokens using prompt + std::cout << "num_candidates: " << candidates_number << std::endl; + for (const auto& whole_input : model_pipeline.get_prompts_with_generated_tokens()) { + auto updated_input = whole_input; + const auto& input_ids = whole_input.token_ids; + const size_t input_length = input_ids.size(); + for (int32_t ngram_size = max_ngram_size; ngram_size > 0; ngram_size--) { + std::vector ngram = std::vector{input_ids.cend() - ngram_size, input_ids.cend()}; + std::cout << "ngram: " << std::endl; + for (const auto& a : ngram) { + std::cout << a; + } + std::cout << std::endl; + + // find ngram match in input_ids + size_t ngram_i = 0; + for (size_t input_i = 0; input_i < input_length - ngram_size; input_i++) { + if (ngram[ngram_i] != input_ids[input_i]) { + ngram_i = 0; + continue; + } + ngram_i++; + + if (ngram_i < ngram_size) { + continue; + } + + // match found with the end at input_i + size_t avaliable_num_pred = std::min(input_length - (input_i + 1), candidates_number); + + // return candidates with length of avaliable_num_pred + std::vector candidate{input_ids.cbegin() + input_i + 1, + input_ids.cbegin() + input_i + 1 + avaliable_num_pred}; + updated_input.token_ids = candidate; + updated_input.log_probs = std::vector(candidate.size(), 0); + + model_pipeline.update_generated_sequence(updated_input); + break; + } + if (whole_input.token_ids != updated_input.token_ids) { + is_updated = true; + break; + } + } + } + + // put candidates to model cache + auto candidate_sequences = model_pipeline.get_generated_sequences(); + // todo: remove debug code + for (const auto& s : candidate_sequences) { + std::cout << "ASSISTANT: "; + for (const auto& d : s.token_ids) { + std::cout << d << " "; + } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + std::cout << std::endl; + std::cout << decode(s.token_ids) << std::endl; + } + } + + const auto gen_seq_before = model_pipeline.get_generated_sequences(); + + // validate candidates and generate 1 new token + model_pipeline.step(); + + if (is_speculative_mode && is_updated) { + // todo: remove debug code + for (const auto& s : model_pipeline.get_generated_sequences()) { + std::cout << "MODEL: "; + for (const auto& d : s.token_ids) { + std::cout << d << " "; + } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + std::cout << std::endl; + std::cout << decode(s.token_ids) << std::endl; + std::cout << std::endl; + } + + // todo: iefode: remove debug prints + for (const auto& gen_seq_after : model_pipeline.get_generated_sequences()) { + const auto& candidate_seq = gen_seq_before[gen_seq_after.request_id]; + size_t before_len = candidate_seq.token_ids.size(), + after_len = gen_seq_after.token_ids.size(); + size_t dist = is_updated ? (after_len <= before_len ? (before_len - after_len) : candidates_number) : 0; + update_strategy(dist); + } + // ov::genai::ContinuousBatchingPipeline::UpdateSeqResult update_result; + // for (const auto& checked_sequence : checked_sequences) { + // update_result = assisting_pipeline.update_generated_sequence(checked_sequence); + // } + + // OPENVINO_ASSERT(candidates_number >= update_result.to_remove); + // if (update_result.to_remove) { + // std::cout << "to_remove: " << update_result.to_remove << std::endl; + // } + // update_strategy(candidates_number - update_result.to_remove); + // std::cout << "=========================" << std::endl; + } +} + +void PromptLookupPipeline::update_strategy(size_t num_matches) { + std::cout << "num_matches: " << num_matches << std::endl; + max_matches = std::max(max_matches, num_matches); + avg_matches += num_matches; + if (max_candidates_number == 0) { + return; + } + if (num_matches == candidates_number) { + candidates_number = std::min(candidates_number + 2, max_candidates_number); + } else { + candidates_number = std::max(int64_t(candidates_number) - 1, int64_t(1)); + } +} + + +void PromptLookupPipeline::set_k(size_t new_default_k) { + candidates_number = new_default_k; + max_candidates_number = new_default_k * 2; + is_speculative_mode = candidates_number > 0; +} + +bool PromptLookupPipeline::has_non_finished_requests() { + return model_pipeline.has_non_finished_requests(); +} + + +std::vector +PromptLookupPipeline::generate_sequences( + const std::vector prompts, + std::vector sampling_params) { + OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); + OPENVINO_ASSERT(prompts.size() == sampling_params.size()); + + std::vector generations, assisting_generations; + for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { + generations.push_back(model_pipeline.add_request(request_id, prompts[request_id], sampling_params[request_id])); + } + + while (has_non_finished_requests()) { + step(); + infer_cnt++; + } + + return generations; +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp new file mode 100644 index 0000000000..c48f70bb9e --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp @@ -0,0 +1,46 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "openvino/genai/cb_basic_pipeline.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +class PromptLookupPipeline : public ov::genai::BasicPipeline { + ov::genai::ContinuousBatchingPipeline model_pipeline; + size_t candidates_number = 0, max_candidates_number = 0, max_ngram_size = 0; + bool is_speculative_mode = false; + std::map> m_encoded_prompt; + + std::vector generate_sequences( + const std::vector prompts, std::vector sampling_params) override; + + +public: + size_t infer_cnt = 0, max_matches = 0, avg_matches = 0; + + PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t max_ngram_size, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t max_ngram_size, + const ov::genai::Tokenizer& tokenizer, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + ov::genai::PipelineMetrics get_metrics() const override; + + void step() override; + + bool has_non_finished_requests() override; + + void set_k(size_t new_default_k); + + void update_strategy(size_t num_matches); +}; \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp index d3a4f01100..f5589bfcdc 100644 --- a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp +++ b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp @@ -5,7 +5,7 @@ #include #include "openvino/genai/generation_config.hpp" -#include "openvino/genai/continuous_batching_pipeline.hpp" +#include "speculative_decoding_pipeline.hpp" void print_generation_result(const ov::genai::GenerationResult& generation_result) { for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { @@ -90,7 +90,7 @@ int main(int argc, char* argv[]) try { // It's possible to construct a Tokenizer from a different path. // If the Tokenizer isn't specified, it's loaded from the same folder. - SpeculativeDecodingPipeline pipe(models_path, assisting_models_path, k, ov::genai::Tokenizer{models_path}, scheduler_config, "CPU"); + SpeculativeDecodingPipeline pipe(models_path, assisting_models_path, k, scheduler_config, "CPU"); auto start_time = std::chrono::system_clock::now(); std::vector generation_results = pipe.generate(prompts, sampling_params); @@ -125,12 +125,12 @@ int main(int argc, char* argv[]) try { std::chrono::duration duration = end_time - start_time; std::cout << std::endl; std::cout << "Duration: " << duration.count() << std::endl; - std::cout << "Infer number: " << pipe.matches_info.size() << std::endl; - std::cout << "MAX matches number: " << pipe.max_matches << std::endl; - auto a = std::accumulate(pipe.matches_info.begin(), pipe.matches_info.end(), 0); - std::cout << "AVG matches number: " << (float(a) / pipe.matches_info.size()) << std::endl; - double c = double(pipe.assisting_model_duration) * 100 / std::chrono::duration_cast(duration).count(); - std::cout << "Speculative model time duration: " << pipe.assisting_model_duration * 1e-9 << " in %: " << c << std::endl; + std::cout << "Infer number: " << pipe.m_matches_info.size() << std::endl; + std::cout << "MAX matches number: " << pipe.m_max_matches << std::endl; + auto a = std::accumulate(pipe.m_matches_info.begin(), pipe.m_matches_info.end(), 0); + std::cout << "AVG matches number: " << (float(a) / pipe.m_matches_info.size()) << std::endl; + double c = double(pipe.m_speculative_model_duration) * 100 / std::chrono::duration_cast(duration).count(); + std::cout << "Speculative model time duration: " << pipe.m_speculative_model_duration * 1e-9 << " in %: " << c << std::endl; } catch (const std::exception& error) { std::cerr << error.what() << '\n'; return EXIT_FAILURE; diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp new file mode 100644 index 0000000000..9c4b96da06 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp @@ -0,0 +1,217 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "speculative_decoding_pipeline.hpp" + +inline size_t get_kv_cache_size(const std::shared_ptr& model) { + const auto& parameters = model->get_parameters(); + // extract num_kv_heads and head_size + size_t kv_caches_inputs_offset = 2; + ov::PartialShape k_shape = parameters[kv_caches_inputs_offset]->get_partial_shape(); + OPENVINO_ASSERT(k_shape.rank().get_length() == 3, "KV cache shape is expected to have rank 3, while shape is ", k_shape); + size_t num_kv_heads = k_shape[1].get_length(), head_size = k_shape[2].get_length(); + return num_kv_heads * head_size; +} + +SpeculativeDecodingPipeline::SpeculativeDecodingPipeline( + const std::string& models_path, + const std::string& speculative_models_path, + size_t start_candidates_number, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + m_tokenizer = ov::genai::Tokenizer(models_path, plugin_config); + + m_candidates_num = start_candidates_number; + m_max_candidates_num = m_candidates_num * 2; + m_is_speculative_mode = m_candidates_num > 0; + + ov::Core core; + std::shared_ptr model = ov::genai::read_model_and_apply_paged_attention(models_path, core), + assisting_model = ov::genai::read_model_and_apply_paged_attention(speculative_models_path, core); + + ov::genai::SchedulerConfig model_scheduler_config = scheduler_config, + assisting_scheduler_config = scheduler_config; + if (m_is_speculative_mode) { + size_t model_cache_size = get_kv_cache_size(model), + assisting_cache_size = get_kv_cache_size(assisting_model); + auto k = float(assisting_cache_size) / (model_cache_size + assisting_cache_size); + auto cache_size = scheduler_config.num_kv_blocks; + auto assisted_cache_size = size_t(cache_size * k); + cache_size -= assisted_cache_size; + model_scheduler_config.num_kv_blocks = cache_size; + assisting_scheduler_config.num_kv_blocks = assisted_cache_size; + m_speculative_pipeline = ov::genai::ContinuousBatchingPipeline(core, assisting_model, m_tokenizer, assisting_scheduler_config, device, plugin_config, false); + } + + m_pipeline = ov::genai::ContinuousBatchingPipeline(core, model, m_tokenizer, model_scheduler_config, device, plugin_config, true); + +} + +void SpeculativeDecodingPipeline::step() { + // std::cout << "=======STEP==================" << std::endl; + std::vector candidate_sequences; + if (m_is_speculative_mode) { + // generate candidates using small model + // std::cout << "num_candidates: " << candidates_number << std::endl; + for (size_t i = 0; i < m_candidates_num; ++i) { + auto start_time = std::chrono::system_clock::now(); + m_speculative_pipeline.step(); + auto end_time = std::chrono::system_clock::now(); + m_speculative_model_duration += std::chrono::duration_cast(end_time - start_time).count(); + } + + // put candidates to model cache + candidate_sequences = m_speculative_pipeline.get_generated_sequences(); + // todo: remove debug code + // for (const auto& s : candidate_sequences) { + // std::cout << "ASSISTANT: "; + // for (const auto& d : s.token_ids) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // std::cout << decode(s.token_ids) << std::endl; + // } + + for (const auto& candidate : candidate_sequences) { + m_pipeline.update_generated_sequence(candidate); + } + } + + // validate candidates and generate 1 new token + m_pipeline.step(); + + if (m_is_speculative_mode) { + // todo: iefode: remove debug prints + auto checked_sequences = m_pipeline.get_generated_sequences(); + // todo: remove debug code + // for (const auto& s : checked_sequences) { + // std::cout << "MODEL: "; + // for (const auto& d : s.token_ids) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + // std::cout << std::endl; + // std::cout << decode(s.token_ids) << std::endl; + // std::cout << std::endl; + // } + + ov::genai::ContinuousBatchingPipeline::UpdateSeqResult update_result; + for (const auto& checked_sequence : checked_sequences) { + update_result = m_speculative_pipeline.update_generated_sequence(checked_sequence); + } + + OPENVINO_ASSERT(m_candidates_num >= update_result.to_remove); + // if (update_result.to_remove) { + // std::cout << "to_remove: " << update_result.to_remove << std::endl; + // } + update_strategy(m_candidates_num - update_result.to_remove); + // std::cout << "=========================" << std::endl; + } +} + +std::vector +SpeculativeDecodingPipeline::generate(const std::vector& prompts, + const std::vector& sampling_params) { + OPENVINO_ASSERT(!m_pipeline.has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); + OPENVINO_ASSERT(prompts.size() == sampling_params.size()); + + std::vector generations, speculative_generations; + for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { + generations.push_back(m_pipeline.add_request(request_id, prompts[request_id], sampling_params[request_id])); + if (m_is_speculative_mode) { + auto assisting_sampling_params = sampling_params[request_id]; + assisting_sampling_params.max_new_tokens += m_max_candidates_num; + assisting_sampling_params.min_new_tokens += m_max_candidates_num; + speculative_generations.push_back(m_speculative_pipeline.add_request(request_id, prompts[request_id], assisting_sampling_params)); + } + } + + while (m_pipeline.has_non_finished_requests()) { + step(); + } + if (m_is_speculative_mode) { + m_speculative_pipeline.finish_all_requests(); + } + + std::vector encoded_results; + for (size_t generation_idx = 0; generation_idx < generations.size(); ++generation_idx) { + const auto& generation = generations[generation_idx]; + ov::genai::EncodedGenerationResult result; + result.m_request_id = 1; + std::vector generation_outputs = generation->read_all(); + std::sort(generation_outputs.begin(), generation_outputs.end(), [=] (ov::genai::GenerationOutput& r1, ov::genai::GenerationOutput& r2) { + return r1.score > r2.score; + }); + + auto num_outputs = std::min(sampling_params[generation_idx].num_return_sequences, generation_outputs.size()); + for (size_t generation_output_idx = 0; generation_output_idx < num_outputs; ++generation_output_idx) { + const auto& generation_output = generation_outputs[generation_output_idx]; + result.m_generation_ids.push_back(std::move(generation_output.generated_token_ids)); + result.m_scores.push_back(generation_output.score); + } + result.m_status = generation->get_status(); + encoded_results.push_back(std::move(result)); + } + + OPENVINO_ASSERT(encoded_results.size() == prompts.size()); + + std::vector decoded_results; + for (ov::genai::EncodedGenerationResult& res : encoded_results) { + std::vector generated; + generated.reserve(res.m_generation_ids.size()); + for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { + generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); + } + decoded_results.push_back(ov::genai::GenerationResult{ + res.m_request_id, + std::move(generated), + std::move(res.m_scores), + res.m_status + }); + } + return decoded_results; +} + +inline size_t get_median(std::vector values) { + const auto size = values.size(); + if (size == 0) { + return 0; + } + size_t offset = values.size() / 2; + + auto it = values.begin() + offset; + std::nth_element(values.begin(), it, values.end()); + + if (size % 2 != 0) { + return *it; + } + auto it_1 = values.begin() + offset - 1; + std::nth_element(values.begin(), it_1, values.end()); + return (*it + *it_1) / 2; +} + + +void SpeculativeDecodingPipeline::update_strategy(size_t num_matches) { + // std::cout << "num_matches: " << num_matches << " m_candidates_num: " << m_candidates_num << std::endl; + if (m_max_candidates_num == 0) { + return; + } + + if (m_max_matches < num_matches) { + m_max_matches = num_matches; + } + if (num_matches == m_candidates_num) { + m_candidates_num = std::min(std::max(m_candidates_num + 1, m_max_matches), m_max_candidates_num); + } else { + m_candidates_num = num_matches > 0 ? num_matches : std::max(get_median(m_matches_info), size_t(1)); + } + m_matches_info.push_back(num_matches); +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp new file mode 100644 index 0000000000..2100c3bdd0 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp @@ -0,0 +1,37 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include + +#include "openvino/genai/tokenizer.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +class SpeculativeDecodingPipeline { +protected: + ov::genai::Tokenizer m_tokenizer; + ov::genai::ContinuousBatchingPipeline m_pipeline, m_speculative_pipeline; + + bool m_is_speculative_mode = false; + size_t m_max_candidates_num = 0, m_candidates_num = 0; + + void update_strategy(size_t num_matches); + +public: + SpeculativeDecodingPipeline(const std::string& models_path, + const std::string& speculative_model_path, + size_t start_candidates_number, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + void step(); + + std::vector generate(const std::vector& prompts, const std::vector& sampling_params); + + size_t m_max_matches = 0; + std::vector m_matches_info; + int64_t m_speculative_model_duration = 0; +}; \ No newline at end of file diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index 626a51c5da..c0644d065f 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -29,6 +29,8 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { std::shared_ptr m_impl; public: + ContinuousBatchingPipeline() = default; + ContinuousBatchingPipeline(const std::string& models_path, const SchedulerConfig& scheduler_config, const std::string& device = "CPU", @@ -52,6 +54,16 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { const ov::AnyMap& plugin_config={} ); + ContinuousBatchingPipeline( + ov::Core& core, + const std::shared_ptr& model, + const ov::genai::Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device="CPU", + const ov::AnyMap& plugin_config={}, + bool is_enable_validation_mode=false + ); + ov::genai::Tokenizer get_tokenizer(); ov::genai::GenerationConfig get_config() const; @@ -80,5 +92,31 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { * @brief finish chat and clear kv cache. */ void finish_chat(); + + // for speculative decoding + void finish_all_requests(); + + struct GeneratedSequence { + uint64_t request_id = 0, sequence_id = 0; + std::vector token_ids; + std::vector log_probs; + + GeneratedSequence(uint64_t req_id, uint64_t seq_id, const std::vector& generated_token_ids, const std::vector& generated_log_probs) : + request_id(req_id), + sequence_id(seq_id), + token_ids(generated_token_ids), + log_probs(generated_log_probs) {}; + }; + + struct UpdateSeqResult { + size_t to_insert, to_remove; + UpdateSeqResult(size_t _to_insert = 0, size_t _to_remove = 0) : to_insert(_to_insert), to_remove(_to_remove) {}; + }; + + std::vector get_generated_sequences(); + UpdateSeqResult update_generated_sequence(const GeneratedSequence& new_sequence); }; + +std::shared_ptr OPENVINO_GENAI_EXPORTS read_model_and_apply_paged_attention(const std::string& model_path, ov::Core& core); + } diff --git a/src/cpp/src/block_manager.hpp b/src/cpp/src/block_manager.hpp index 0026ed60d6..5925beb13b 100644 --- a/src/cpp/src/block_manager.hpp +++ b/src/cpp/src/block_manager.hpp @@ -232,6 +232,9 @@ class BlockManager { auto running_sequences = sequence_group->get_not_finished_sequences(); for (size_t idx = 0; idx < running_sequences.size(); ++idx) { auto seq_id = running_sequences[idx]->get_id(); + if (m_block_table.count(seq_id) == 0) { + continue; + } OPENVINO_ASSERT(m_block_table.count(seq_id) > 0, "Invalid sequence group."); auto block_table = m_block_table[seq_id]; free_sequence_partially(seq_id, blocks_num); @@ -444,6 +447,18 @@ class BlockManager { return blocks_count; } + void free_empty_physical_blocks(SequenceGroup::Ptr seq_group) { + size_t num_logical_blocks = seq_group->get_num_logical_blocks(); + for (const auto& sequence : seq_group->get_running_sequences()) { + auto seq_id = sequence->get_id(); + auto& block_table = m_block_table[seq_id]; + size_t num_physical_blocks = block_table.size(); + if (num_physical_blocks > num_logical_blocks) { + free_sequence_partially(seq_id, num_physical_blocks - num_logical_blocks); + } + } + } + std::map> append_slots(SequenceGroup::Ptr seq_group) { size_t num_logical_blocks = seq_group->get_num_logical_blocks(); diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 8044eddc6c..30ea652724 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -21,7 +21,15 @@ using namespace ov::genai; template struct overloaded : Ts... {using Ts::operator()...;}; template overloaded(Ts...) -> overloaded; -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config); +void apply_paged_attention_transformations(std::shared_ptr model); +void set_type_and_shape_to_kv_cache(std::shared_ptr model, DeviceConfig& device_config); + +std::shared_ptr +ov::genai::read_model_and_apply_paged_attention(const std::string& models_path, ov::Core& core) { + auto model = core.read_model(models_path + "/openvino_model.xml"); + apply_paged_attention_transformations(model); + return model; +} class ContinuousBatchingPipeline::Impl { ov::genai::Tokenizer m_tokenizer; @@ -29,6 +37,7 @@ class ContinuousBatchingPipeline::Impl { std::shared_ptr m_cache_manager; std::shared_ptr m_model_runner; std::shared_ptr m_sampler; + bool m_is_validation_mode_enabled = false; // TODO (mzegla): GenerationConfig is request specific object // and pipeline only uses default rng_seed. @@ -85,17 +94,14 @@ class ContinuousBatchingPipeline::Impl { } } -public: - Impl(const std::string& models_path, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config) : - m_tokenizer{tokenizer} { - ov::Core core; - - // The model can be compiled for GPU as well - std::shared_ptr model = core.read_model(models_path + "/openvino_model.xml"); - - DeviceConfig device_config(core, scheduler_config, device, plugin_config); - - apply_paged_attention_transformations(model, device_config); + inline void + compile_model(std::shared_ptr model, + const SchedulerConfig& scheduler_config, + const ov::AnyMap& plugin_config, + const std::string& device, + ov::Core& core) { + DeviceConfig device_config(core, scheduler_config, device); + set_type_and_shape_to_kv_cache(model, device_config); ov::InferRequest infer_request = core.compile_model(model, device_config.get_device(), plugin_config).create_infer_request(); @@ -105,24 +111,49 @@ class ContinuousBatchingPipeline::Impl { infer_request.set_input_tensor(2 + decoder_layer_id * 2, m_cache_manager->get_key_cache(decoder_layer_id)); infer_request.set_input_tensor(2 + decoder_layer_id * 2 + 1, m_cache_manager->get_value_cache(decoder_layer_id)); } - SchedulerConfig updated_config = scheduler_config; // update KV number in scheduler config if (scheduler_config.num_kv_blocks != device_config.get_num_kv_blocks()) { updated_config.num_kv_blocks = device_config.get_num_kv_blocks(); } - m_scheduler = std::make_shared(updated_config); // and finally create model runner m_model_runner = std::make_shared(infer_request, updated_config); m_sampler = std::make_shared(); m_sampler->set_seed(m_generation_config.rng_seed); + m_sampler->set_seed(0); // read default generation config } + inline void pull_awaiting_requests() { + if (m_requests.empty()) { + // Pull awaiting requests + if (!m_awaiting_requests.empty()) { + std::lock_guard lock{m_awaiting_requests_mutex}; + m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); + m_awaiting_requests.clear(); + } + } + } + +public: + Impl(const std::string& models_path, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config) : + m_tokenizer{tokenizer} { + ov::Core core; + // The model can be compiled for GPU as well + std::shared_ptr model = read_model_and_apply_paged_attention(models_path, core); + compile_model(model, scheduler_config, plugin_config, device, core); + } + Impl(const std::string& models_path, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& llm_plugin_config, const ov::AnyMap& tokenizer_plugin_config) : Impl{models_path, Tokenizer(models_path, tokenizer_plugin_config), scheduler_config, device, llm_plugin_config} {} + + Impl(ov::Core& core, std::shared_ptr model, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config, bool is_validation_mode = false) : + m_is_validation_mode_enabled(is_validation_mode), + m_tokenizer{tokenizer} { + compile_model(model, scheduler_config, plugin_config, device, core); + } ov::genai::GenerationConfig get_config() const { return m_generation_config; @@ -167,18 +198,15 @@ class ContinuousBatchingPipeline::Impl { static ManualTimer step_timer("step()"); step_timer.start(); - // Pull awaiting requests - { - std::lock_guard lock{m_awaiting_requests_mutex}; - m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); - m_awaiting_requests.clear(); - } + pull_awaiting_requests(); m_pipeline_metrics.requests = m_requests.size(); Scheduler::Output scheduler_output; { static ManualTimer timer("scheduling"); timer.start(); + // todo: iefode: to move to other place? + m_scheduler->clean_empty_blocks(m_requests); scheduler_output = m_scheduler->schedule(m_requests); m_pipeline_metrics.scheduled_requests = scheduler_output.m_scheduled_sequence_groups_ids.size(); m_pipeline_metrics.cache_usage = scheduler_output.m_cache_usage; @@ -221,7 +249,7 @@ class ContinuousBatchingPipeline::Impl { { static ManualTimer timer("sample"); timer.start(); - sampler_output = m_sampler->sample(m_requests, logits); + sampler_output = m_sampler->sample(m_requests, logits, m_is_validation_mode_enabled); timer.end(); } @@ -382,6 +410,113 @@ class ContinuousBatchingPipeline::Impl { m_is_chat_conversation = false; m_history.clear(); }; + + void finish_all_requests() { + while (!m_requests.empty()) { + const auto& request = *m_requests.rbegin(); + for (const auto& sequence : request->get_sequences()) { + m_scheduler->free_sequence(sequence->get_id()); + } + m_sampler->clear_beam_search_info(request->get_request_id()); + m_requests.pop_back(); + } + } + + std::vector get_generated_sequences() { + pull_awaiting_requests(); + std::vector result; + for (const auto& request : m_requests) { + const auto request_id = request->get_request_id(); + for (const auto& sequence : request->get_sequences()) { + auto generated_ids = sequence->get_generated_ids(); + auto log_probs = sequence->get_log_probs(); + result.emplace_back(request_id, sequence->get_grouped_id(), generated_ids, log_probs); + } + } + return result; + + } + + ContinuousBatchingPipeline::UpdateSeqResult + update_generated_sequence(const ContinuousBatchingPipeline::GeneratedSequence& candidate_sequence) { + pull_awaiting_requests(); + bool is_empty_generated_tokens = false; + for (auto& request : m_requests) { + if (candidate_sequence.request_id == request->get_request_id()) { + bool is_seq_exists = false; + // todo: iefode: multiseq + size_t to_remove_tokens = 0, to_insert_tokens = 0; + for (auto& sequence : request->get_sequences()) { + if (candidate_sequence.sequence_id == sequence->get_grouped_id()) { + is_seq_exists = true; + auto present_ids = sequence->get_generated_ids(); + const auto& candidate_ids = candidate_sequence.token_ids; + + // remove extra tokens from sequence + { + auto token_idx = std::min(present_ids.size(), candidate_ids.size()); + if (token_idx) { + while (token_idx-- > 0) { + if (present_ids[token_idx] == candidate_ids[token_idx]) { + break; + } + } + to_remove_tokens = present_ids.size() - (token_idx + 1); + if (to_remove_tokens > 0) { + const auto gen_ids_before = sequence->get_generated_ids(); + sequence->remove_last_n_tokens(to_remove_tokens); + present_ids = sequence->get_generated_ids(); + const size_t gen_len_before = gen_ids_before.size(), + gen_len_after = present_ids.size(); + if (gen_len_after == 0) { + is_empty_generated_tokens = true; + } + OPENVINO_ASSERT(gen_len_after < gen_len_before); + for (size_t i = gen_len_after; i < gen_len_before; ++i) { + m_sampler->update_logit_processor(request->get_request_id(), gen_ids_before[i]); + } + } + } + } + // insert new tokens to sequence + { + OPENVINO_ASSERT(candidate_ids.size() >= present_ids.size()); + const auto& candidate_log_probs = candidate_sequence.log_probs; + const size_t start_id = std::min(present_ids.size(), candidate_ids.size()), + stop_id = std::max(present_ids.size(), candidate_ids.size()); + to_insert_tokens = stop_id - start_id; + for (size_t i = start_id; i < stop_id; ++i) { + sequence->append_token(candidate_ids[i], i < candidate_log_probs.size() ? candidate_log_probs[i] : 0.f); + } + } + } + break; + } + if (!is_seq_exists) { + Sequence::Ptr new_sequence(new Sequence(candidate_sequence.sequence_id)); + const auto& generated_tokens = candidate_sequence.token_ids; + const auto& generated_log_probs = candidate_sequence.log_probs; + for (size_t i = 0; i < generated_tokens.size(); ++i) { + new_sequence->append_token(generated_tokens[i], generated_log_probs[i]); + } + request->add_sequence(new_sequence); + } + if (!is_empty_generated_tokens) { + // in case of non-prompt we need to take prev tokens + token to validate + if (request->get_num_processed_tokens()) + ++to_insert_tokens; + if (to_remove_tokens > 0) { + request->decrease_processed_tokens(to_remove_tokens); + } + // to validate tokens/extend kv-cache before generation + request->set_validation_len(to_insert_tokens); + } else if (to_remove_tokens > 0) { + request->update_processed_tokens_num(request->get_prompt_len()); + } + return ContinuousBatchingPipeline::UpdateSeqResult(to_insert_tokens, to_remove_tokens); + } + } + } }; ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::string& models_path, @@ -400,6 +535,16 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const ov::AnyMap& plugin_config ) : m_impl{std::make_shared(model_path, tokenizer, scheduler_config, device, plugin_config)} {} +ContinuousBatchingPipeline::ContinuousBatchingPipeline( + ov::Core& core, + const std::shared_ptr& model, + const ov::genai::Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config, + bool is_enable_validation_mode +) : m_impl{std::make_shared(core, model, tokenizer, scheduler_config, device, plugin_config, is_enable_validation_mode)} {} + ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() { return m_impl->get_tokenizer(); } @@ -443,3 +588,16 @@ void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { void ContinuousBatchingPipeline::finish_chat() { m_impl->finish_chat(); }; +void ContinuousBatchingPipeline::finish_all_requests() { + m_impl->finish_all_requests(); +} + +std::vector +ContinuousBatchingPipeline::get_generated_sequences() { + return m_impl->get_generated_sequences(); +} + +ContinuousBatchingPipeline::UpdateSeqResult +ContinuousBatchingPipeline::update_generated_sequence(const ContinuousBatchingPipeline::GeneratedSequence& new_sequence) { + return m_impl->update_generated_sequence(new_sequence); +} diff --git a/src/cpp/src/paged_attention_transformations.cpp b/src/cpp/src/paged_attention_transformations.cpp index 3f343048ea..57d8c655fd 100644 --- a/src/cpp/src/paged_attention_transformations.cpp +++ b/src/cpp/src/paged_attention_transformations.cpp @@ -16,12 +16,13 @@ inline ov::PartialShape to_partial_with_dyn_0_dim(const ov::Shape& static_shape) return partial_shape; } -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config) { +void apply_paged_attention_transformations(std::shared_ptr model) { const ov::op::util::VariableVector& variables = model->get_variables(); OPENVINO_ASSERT(!variables.empty(), "Model is supposed to be stateful"); - ov::pass::SDPAToPagedAttention().run_on_model(model); +} +void set_type_and_shape_to_kv_cache(std::shared_ptr model, DeviceConfig& device_config) { const ov::ParameterVector& parameters = model->get_parameters(); size_t num_layers = std::count_if(parameters.begin(), parameters.end(), [](std::shared_ptr parameter) { diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index caf8cd2cc4..0384102667 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -64,7 +64,7 @@ class Scheduler { void clean_empty_blocks(std::vector& seq_groups) { for (const auto& seq_group : seq_groups) - m_block_manager.free_group_partially(seq_group, m_block_manager.num_free_blocks()); + m_block_manager.free_empty_physical_blocks(seq_group); } const std::vector& get_block_table(const Sequence& seq) {