diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index 5339817c1f..9fc0642275 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -6,6 +6,9 @@ add_subdirectory(cpp/beam_search_causal_lm) add_subdirectory(cpp/chat_sample) add_subdirectory(cpp/continuous_batching_accuracy) add_subdirectory(cpp/continuous_batching_benchmark) +# todo: iefode +# add_subdirectory(cpp/continuous_batching_prompt_lookup) +add_subdirectory(cpp/continuous_batching_speculative_decoding) add_subdirectory(cpp/greedy_causal_lm) add_subdirectory(cpp/multinomial_causal_lm) add_subdirectory(cpp/prompt_lookup_decoding_lm) diff --git a/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt b/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt new file mode 100644 index 0000000000..b122782a2e --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# start of dependencies + +include(FetchContent) + +FetchContent_Declare(cxxopts + URL https://github.com/jarro2783/cxxopts/archive/refs/tags/v3.1.1.tar.gz + URL_HASH SHA256=523175f792eb0ff04f9e653c90746c12655f10cb70f1d5e6d6d9491420298a08) + +FetchContent_Declare(nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v3.11.3.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406) + +FetchContent_MakeAvailable(cxxopts) +FetchContent_MakeAvailable(nlohmann_json) + +find_package(OpenVINO REQUIRED COMPONENTS Runtime) + +# end of dependencies + +set(TARGET_NAME continuous_batching_prompt_lookup) +add_executable(${TARGET_NAME} ${TARGET_NAME}.cpp "prompt_lookup_pipeline.hpp" "prompt_lookup_pipeline.cpp") +target_link_libraries(${TARGET_NAME} PRIVATE openvino::genai cxxopts::cxxopts) \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp b/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp new file mode 100644 index 0000000000..3219d2e74f --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/continuous_batching_prompt_lookup.cpp @@ -0,0 +1,138 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "openvino/genai/generation_config.hpp" + +#include "prompt_lookup_pipeline.hpp" + +void print_generation_result(const ov::genai::GenerationResult& generation_result) { + for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { + std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl; + } +} + +int main(int argc, char* argv[]) try { + // Command line options + + cxxopts::Options options("accuracy_sample", "Help command"); + + options.add_options() + ("n,num_prompts", "A number of prompts", cxxopts::value()->default_value("1")) + ("dynamic_split_fuse", "Whether to use dynamic split-fuse or vLLM scheduling", cxxopts::value()->default_value("false")) + ("m,model", "Path to model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("k,candidates_number", "candidates_number", cxxopts::value()->default_value("5")) + ("ngram", "Ngram", cxxopts::value()->default_value("5")) + ("g,generated_len", "generated_len", cxxopts::value()->default_value("30")) + ("h,help", "Print usage"); + + cxxopts::ParseResult result; + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cout << e.what() << "\n\n"; + std::cout << options.help() << std::endl; + return EXIT_FAILURE; + } + + if (result.count("help")) { + std::cout << options.help() << std::endl; + return EXIT_SUCCESS; + } + + const size_t num_prompts = result["num_prompts"].as(); + const bool dynamic_split_fuse = result["dynamic_split_fuse"].as(); + const std::string models_path = result["model"].as(); + const size_t k = result["candidates_number"].as(); + const size_t g = result["generated_len"].as(); + const size_t n = result["ngram"].as(); + + // create dataset + + std::vector prompt_examples = { + // "What is OpenVINO?", + // "How are you?", + "code: ```for (const auto& a : b) { std::cout << a << std::endl; }```", + "Tell me something about Canada", + "What is OpenVINO?", + }; + + auto greedy = ov::genai::greedy(); + greedy.max_new_tokens = g; + + std::vector sampling_params_examples { + // ov::genai::beam_search(), + greedy, + // ov::genai::multinomial(), + }; + + std::vector prompts(num_prompts); + std::vector sampling_params(num_prompts); + + for (size_t request_id = 0; request_id < num_prompts; ++request_id) { + prompts[request_id] = prompt_examples[request_id % prompt_examples.size()]; + sampling_params[request_id] = sampling_params_examples[request_id % sampling_params_examples.size()]; + } + + // Perform the inference + + ov::genai::SchedulerConfig scheduler_config; + // batch size + scheduler_config.max_num_batched_tokens = 256; + // cache params + scheduler_config.num_kv_blocks = 364; + scheduler_config.block_size = 32; + // mode - vLLM or dynamic_split_fuse + scheduler_config.dynamic_split_fuse = dynamic_split_fuse; + // vLLM specific params + scheduler_config.max_num_seqs = 2; + + // It's possible to construct a Tokenizer from a different path. + // If the Tokenizer isn't specified, it's loaded from the same folder. + PromptLookupPipeline pipe(models_path, k, n, ov::genai::Tokenizer{models_path}, scheduler_config, "CPU"); + auto start_time = std::chrono::system_clock::now(); + std::vector generation_results = pipe.generate(prompts, sampling_params); + + for (size_t request_id = 0; request_id < generation_results.size(); ++request_id) { + const ov::genai::GenerationResult & generation_result = generation_results[request_id]; + std::cout << "Question: " << prompts[request_id] << std::endl; + switch (generation_result.m_status) + { + case ov::genai::GenerationStatus::FINISHED: + print_generation_result(generation_result); + break; + case ov::genai::GenerationStatus::IGNORED: + std::cout << "Request was ignored due to lack of memory." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + std::cout << "Request was aborted." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + default: + break; + } + std::cout << std::endl; + } + auto end_time = std::chrono::system_clock::now(); + std::chrono::duration duration = end_time - start_time; + std::cout << std::endl; + std::cout << "Duration: " << duration.count() << std::endl; + std::cout << "Infer number: " << pipe.infer_cnt << std::endl; + std::cout << "MAX matches number: " << pipe.max_matches << std::endl; + std::cout << "AVG matches number: " << (float(pipe.avg_matches) / pipe.infer_cnt) << std::endl; +} catch (const std::exception& error) { + std::cerr << error.what() << '\n'; + return EXIT_FAILURE; +} catch (...) { + std::cerr << "Non-exception object thrown\n"; + return EXIT_FAILURE; +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp new file mode 100644 index 0000000000..af95d62f4b --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.cpp @@ -0,0 +1,189 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "prompt_lookup_pipeline.hpp" + +PromptLookupPipeline::PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t ngram_size, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + ov::genai::Tokenizer tokenizer(models_path); + PromptLookupPipeline(models_path, candidates_number, max_ngram_size, tokenizer, scheduler_config, device, plugin_config); +}; + +PromptLookupPipeline::PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t ngram_size, + const ov::genai::Tokenizer& tokenizer, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + m_tokenizer = tokenizer; + set_k(candidates_number); + max_ngram_size = ngram_size; + + model_pipeline = ov::genai::ContinuousBatchingPipeline(models_path, m_tokenizer, scheduler_config, device, plugin_config); + model_pipeline.enable_validation_mode(); +} + +ov::genai::PipelineMetrics PromptLookupPipeline::get_metrics() const { + return model_pipeline.get_metrics(); +} + +void PromptLookupPipeline::step() { + std::cout << "=======STEP==================" << std::endl; + bool is_updated = false; + if (is_speculative_mode) { + // predict tokens using prompt + std::cout << "num_candidates: " << candidates_number << std::endl; + for (const auto& whole_input : model_pipeline.get_prompts_with_generated_tokens()) { + auto updated_input = whole_input; + const auto& input_ids = whole_input.token_ids; + const size_t input_length = input_ids.size(); + for (int32_t ngram_size = max_ngram_size; ngram_size > 0; ngram_size--) { + std::vector ngram = std::vector{input_ids.cend() - ngram_size, input_ids.cend()}; + std::cout << "ngram: " << std::endl; + for (const auto& a : ngram) { + std::cout << a; + } + std::cout << std::endl; + + // find ngram match in input_ids + size_t ngram_i = 0; + for (size_t input_i = 0; input_i < input_length - ngram_size; input_i++) { + if (ngram[ngram_i] != input_ids[input_i]) { + ngram_i = 0; + continue; + } + ngram_i++; + + if (ngram_i < ngram_size) { + continue; + } + + // match found with the end at input_i + size_t avaliable_num_pred = std::min(input_length - (input_i + 1), candidates_number); + + // return candidates with length of avaliable_num_pred + std::vector candidate{input_ids.cbegin() + input_i + 1, + input_ids.cbegin() + input_i + 1 + avaliable_num_pred}; + updated_input.token_ids = candidate; + updated_input.log_probs = std::vector(candidate.size(), 0); + + model_pipeline.update_generated_sequence(updated_input); + break; + } + if (whole_input.token_ids != updated_input.token_ids) { + is_updated = true; + break; + } + } + } + + // put candidates to model cache + auto candidate_sequences = model_pipeline.get_generated_sequences(); + // todo: remove debug code + for (const auto& s : candidate_sequences) { + std::cout << "ASSISTANT: "; + for (const auto& d : s.token_ids) { + std::cout << d << " "; + } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + std::cout << std::endl; + std::cout << decode(s.token_ids) << std::endl; + } + } + + const auto gen_seq_before = model_pipeline.get_generated_sequences(); + + // validate candidates and generate 1 new token + model_pipeline.step(); + + if (is_speculative_mode && is_updated) { + // todo: remove debug code + for (const auto& s : model_pipeline.get_generated_sequences()) { + std::cout << "MODEL: "; + for (const auto& d : s.token_ids) { + std::cout << d << " "; + } + // std::cout << std::endl; + // for (const auto& d : s.log_probs) { + // std::cout << d << " "; + // } + std::cout << std::endl; + std::cout << decode(s.token_ids) << std::endl; + std::cout << std::endl; + } + + // todo: iefode: remove debug prints + for (const auto& gen_seq_after : model_pipeline.get_generated_sequences()) { + const auto& candidate_seq = gen_seq_before[gen_seq_after.request_id]; + size_t before_len = candidate_seq.token_ids.size(), + after_len = gen_seq_after.token_ids.size(); + size_t dist = is_updated ? (after_len <= before_len ? (before_len - after_len) : candidates_number) : 0; + update_strategy(dist); + } + // ov::genai::ContinuousBatchingPipeline::UpdateSeqResult update_result; + // for (const auto& checked_sequence : checked_sequences) { + // update_result = assisting_pipeline.update_generated_sequence(checked_sequence); + // } + + // OPENVINO_ASSERT(candidates_number >= update_result.to_remove); + // if (update_result.to_remove) { + // std::cout << "to_remove: " << update_result.to_remove << std::endl; + // } + // update_strategy(candidates_number - update_result.to_remove); + // std::cout << "=========================" << std::endl; + } +} + +void PromptLookupPipeline::update_strategy(size_t num_matches) { + std::cout << "num_matches: " << num_matches << std::endl; + max_matches = std::max(max_matches, num_matches); + avg_matches += num_matches; + if (max_candidates_number == 0) { + return; + } + if (num_matches == candidates_number) { + candidates_number = std::min(candidates_number + 2, max_candidates_number); + } else { + candidates_number = std::max(int64_t(candidates_number) - 1, int64_t(1)); + } +} + + +void PromptLookupPipeline::set_k(size_t new_default_k) { + candidates_number = new_default_k; + max_candidates_number = new_default_k * 2; + is_speculative_mode = candidates_number > 0; +} + +bool PromptLookupPipeline::has_non_finished_requests() { + return model_pipeline.has_non_finished_requests(); +} + + +std::vector +PromptLookupPipeline::generate_sequences( + const std::vector prompts, + std::vector sampling_params) { + OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); + OPENVINO_ASSERT(prompts.size() == sampling_params.size()); + + std::vector generations, assisting_generations; + for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { + generations.push_back(model_pipeline.add_request(request_id, prompts[request_id], sampling_params[request_id])); + } + + while (has_non_finished_requests()) { + step(); + infer_cnt++; + } + + return generations; +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp new file mode 100644 index 0000000000..c48f70bb9e --- /dev/null +++ b/samples/cpp/continuous_batching_prompt_lookup/prompt_lookup_pipeline.hpp @@ -0,0 +1,46 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "openvino/genai/cb_basic_pipeline.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +class PromptLookupPipeline : public ov::genai::BasicPipeline { + ov::genai::ContinuousBatchingPipeline model_pipeline; + size_t candidates_number = 0, max_candidates_number = 0, max_ngram_size = 0; + bool is_speculative_mode = false; + std::map> m_encoded_prompt; + + std::vector generate_sequences( + const std::vector prompts, std::vector sampling_params) override; + + +public: + size_t infer_cnt = 0, max_matches = 0, avg_matches = 0; + + PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t max_ngram_size, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + PromptLookupPipeline(const std::string& models_path, + size_t candidates_number, + size_t max_ngram_size, + const ov::genai::Tokenizer& tokenizer, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + ov::genai::PipelineMetrics get_metrics() const override; + + void step() override; + + bool has_non_finished_requests() override; + + void set_k(size_t new_default_k); + + void update_strategy(size_t num_matches); +}; \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/CMakeLists.txt b/samples/cpp/continuous_batching_speculative_decoding/CMakeLists.txt new file mode 100644 index 0000000000..b6b8487b74 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# start of dependencies + +include(FetchContent) + +FetchContent_Declare(cxxopts + URL https://github.com/jarro2783/cxxopts/archive/refs/tags/v3.1.1.tar.gz + URL_HASH SHA256=523175f792eb0ff04f9e653c90746c12655f10cb70f1d5e6d6d9491420298a08) + +FetchContent_Declare(nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v3.11.3.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406) + +FetchContent_MakeAvailable(cxxopts) +FetchContent_MakeAvailable(nlohmann_json) + +find_package(OpenVINO REQUIRED COMPONENTS Runtime) + +# end of dependencies + +set(TARGET_NAME continuous_batching_speculative_decoding) +add_executable(${TARGET_NAME} ${TARGET_NAME}.cpp "speculative_decoding_pipeline.hpp" "speculative_decoding_pipeline.cpp") +target_link_libraries(${TARGET_NAME} PRIVATE openvino::genai cxxopts::cxxopts) \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp new file mode 100644 index 0000000000..782003d789 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/continuous_batching_speculative_decoding.cpp @@ -0,0 +1,130 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "openvino/genai/generation_config.hpp" +#include "speculative_decoding_pipeline.hpp" + +void print_generation_result(const ov::genai::GenerationResult& generation_result) { + for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { + std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl; + } +} + +int main(int argc, char* argv[]) try { + // Command line options + + cxxopts::Options options("accuracy_sample", "Help command"); + + options.add_options() + ("n,num_prompts", "A number of prompts", cxxopts::value()->default_value("1")) + ("dynamic_split_fuse", "Whether to use dynamic split-fuse or vLLM scheduling", cxxopts::value()->default_value("false")) + ("m,model", "Path to model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("a,assisting_model", "Path to assisting model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("k,candidates_number", "candidates_number", cxxopts::value()->default_value("5")) + ("g,generated_len", "generated_len", cxxopts::value()->default_value("30")) + ("h,help", "Print usage"); + + cxxopts::ParseResult result; + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cout << e.what() << "\n\n"; + std::cout << options.help() << std::endl; + return EXIT_FAILURE; + } + + if (result.count("help")) { + std::cout << options.help() << std::endl; + return EXIT_SUCCESS; + } + + const size_t num_prompts = result["num_prompts"].as(); + const bool dynamic_split_fuse = result["dynamic_split_fuse"].as(); + const std::string models_path = result["model"].as(); + const std::string assisting_models_path = result["assisting_model"].as(); + const size_t k = result["candidates_number"].as(); + const size_t g = result["generated_len"].as(); + + // create dataset + + std::vector prompt_examples = { + "The United Arab Emirates[c] (UAE), or simply the Emirates,[d] is a country in West Asia, in the Middle East, at the eastern end of the Arabian Peninsula. It is a federal, elective monarchy composed of seven emirates, with Abu Dhabi as its capital.[13] It shares land borders with Oman to the east and northwest, and with Saudi Arabia to the southwest; as well as maritime borders in the Persian Gulf with Qatar and Iran, and with Oman in the Gulf of Oman.", + "What is OpenVINO?", + "How are you?", + "What is your name?", + "Tell me something about Canada", + "What is OpenVINO?", + }; + + auto greedy = ov::genai::greedy(); + greedy.max_new_tokens = g; + + std::vector sampling_params_examples { + // ov::genai::beam_search(), + greedy, + // ov::genai::multinomial(), + }; + + std::vector prompts(num_prompts); + std::vector sampling_params(num_prompts); + + for (size_t request_id = 0; request_id < num_prompts; ++request_id) { + prompts[request_id] = prompt_examples[request_id % prompt_examples.size()]; + sampling_params[request_id] = sampling_params_examples[request_id % sampling_params_examples.size()]; + } + + // Perform the inference + + ov::genai::SchedulerConfig scheduler_config; + // batch size + scheduler_config.max_num_batched_tokens = 32; + // cache params + scheduler_config.num_kv_blocks = 364; + scheduler_config.block_size = 32; + // mode - vLLM or dynamic_split_fuse + scheduler_config.dynamic_split_fuse = dynamic_split_fuse; + // vLLM specific params + scheduler_config.max_num_seqs = 2; + + // It's possible to construct a Tokenizer from a different path. + // If the Tokenizer isn't specified, it's loaded from the same folder. + SpeculativeDecodingPipeline pipe(models_path, assisting_models_path, k, scheduler_config, "CPU"); + std::vector generation_results = pipe.generate(prompts, sampling_params); + + for (size_t request_id = 0; request_id < generation_results.size(); ++request_id) { + const ov::genai::GenerationResult & generation_result = generation_results[request_id]; + std::cout << "Question: " << prompts[request_id] << std::endl; + switch (generation_result.m_status) + { + case ov::genai::GenerationStatus::FINISHED: + print_generation_result(generation_result); + break; + case ov::genai::GenerationStatus::IGNORED: + std::cout << "Request was ignored due to lack of memory." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + std::cout << "Request was aborted." < 0) { + std::cout << "Partial result:" << std::endl; + print_generation_result(generation_result); + } + break; + default: + break; + } + std::cout << std::endl; + } +} catch (const std::exception& error) { + std::cerr << error.what() << '\n'; + return EXIT_FAILURE; +} catch (...) { + std::cerr << "Non-exception object thrown\n"; + return EXIT_FAILURE; +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp new file mode 100644 index 0000000000..c9dab81f12 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.cpp @@ -0,0 +1,176 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "speculative_decoding_pipeline.hpp" + +inline size_t get_kv_cache_size(const std::shared_ptr& model) { + const auto& parameters = model->get_parameters(); + // extract num_kv_heads and head_size + size_t kv_caches_inputs_offset = 2; + ov::PartialShape k_shape = parameters[kv_caches_inputs_offset]->get_partial_shape(); + OPENVINO_ASSERT(k_shape.rank().get_length() == 3, "KV cache shape is expected to have rank 3, while shape is ", k_shape); + size_t num_kv_heads = k_shape[1].get_length(), head_size = k_shape[2].get_length(); + return num_kv_heads * head_size; +} + +SpeculativeDecodingPipeline::SpeculativeDecodingPipeline( + const std::string& models_path, + const std::string& speculative_models_path, + size_t start_candidates_number, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config) { + m_tokenizer = ov::genai::Tokenizer(models_path, plugin_config); + + m_candidates_num = start_candidates_number; + m_max_candidates_num = m_candidates_num * 2; + m_is_speculative_mode = m_candidates_num > 0; + + ov::Core core; + std::shared_ptr model = ov::genai::read_model_and_apply_paged_attention(models_path, core), + assisting_model = ov::genai::read_model_and_apply_paged_attention(speculative_models_path, core); + + ov::genai::SchedulerConfig model_scheduler_config = scheduler_config, + assisting_scheduler_config = scheduler_config; + if (m_is_speculative_mode) { + // split KV cache to 2 caches for speculative and base parts + size_t model_cache_size = get_kv_cache_size(model), + assisting_cache_size = get_kv_cache_size(assisting_model); + auto k = float(assisting_cache_size) / (model_cache_size + assisting_cache_size); + auto cache_size = scheduler_config.num_kv_blocks; + auto assisted_cache_size = size_t(cache_size * k); + cache_size -= assisted_cache_size; + model_scheduler_config.num_kv_blocks = cache_size; + assisting_scheduler_config.num_kv_blocks = assisted_cache_size; + m_speculative_pipeline = ov::genai::ContinuousBatchingPipeline(core, assisting_model, m_tokenizer, assisting_scheduler_config, device, plugin_config, false); + } + + m_pipeline = ov::genai::ContinuousBatchingPipeline(core, model, m_tokenizer, model_scheduler_config, device, plugin_config, true); +} + +void SpeculativeDecodingPipeline::step() { + std::vector candidate_sequences; + if (m_is_speculative_mode) { + // find minimum(candidates_number, seq_len) to generate candidates + size_t min_candidates_number = m_candidates_num; + for (auto& request : m_to_generate_length) { + if (request.second < min_candidates_number && request.second > 0) { + min_candidates_number = request.second; + } + } + // generate candidates by speculative model + for (size_t i = 0; i < min_candidates_number; ++i) { + m_speculative_pipeline.step(); + } + + // put candidates to model KV cache + candidate_sequences = m_speculative_pipeline.get_generated_sequences(); + for (const auto& candidate : candidate_sequences) { + m_pipeline.update_generated_sequence(candidate); + } + } + + // validate candidates and generate 1 new token + m_pipeline.step(); + + if (m_is_speculative_mode) { + auto checked_sequences = m_pipeline.get_generated_sequences(); + size_t max_removed_token_cnt = 0; + for (const auto& checked_sequence : checked_sequences) { + auto update_result = m_speculative_pipeline.update_generated_sequence(checked_sequence); + max_removed_token_cnt = std::max(max_removed_token_cnt, update_result.to_remove); + } + OPENVINO_ASSERT(m_candidates_num >= max_removed_token_cnt); + auto num_matches = m_candidates_num - max_removed_token_cnt; + update_strategy(num_matches); + + // update to generate tokens + for (auto& request : m_to_generate_length) { + if (request.second > num_matches) { + request.second -= (num_matches + 1); + } else { + request.second = 0; + m_speculative_pipeline.finish_request(request.first); + } + } + } +} + +std::vector +SpeculativeDecodingPipeline::generate(const std::vector& prompts, + const std::vector& sampling_params) { + OPENVINO_ASSERT(!m_pipeline.has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); + OPENVINO_ASSERT(prompts.size() == sampling_params.size()); + + std::vector generations, speculative_generations; + for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { + generations.push_back(m_pipeline.add_request(request_id, prompts[request_id], sampling_params[request_id])); + m_to_generate_length.insert({ request_id, sampling_params[request_id].max_new_tokens }); + if (m_is_speculative_mode) { + auto assisting_sampling_params = sampling_params[request_id]; + assisting_sampling_params.max_new_tokens += m_max_candidates_num; + assisting_sampling_params.min_new_tokens += m_max_candidates_num; + speculative_generations.push_back(m_speculative_pipeline.add_request(request_id, prompts[request_id], assisting_sampling_params)); + } + } + + while (m_pipeline.has_non_finished_requests()) { + step(); + } + if (m_is_speculative_mode) { + // finish all speculative requests + m_speculative_pipeline.finish_request(-1); + } + + std::vector encoded_results; + for (size_t generation_idx = 0; generation_idx < generations.size(); ++generation_idx) { + const auto& generation = generations[generation_idx]; + ov::genai::EncodedGenerationResult result; + result.m_request_id = 1; + std::vector generation_outputs = generation->read_all(); + std::sort(generation_outputs.begin(), generation_outputs.end(), [=] (ov::genai::GenerationOutput& r1, ov::genai::GenerationOutput& r2) { + return r1.score > r2.score; + }); + + auto num_outputs = std::min(sampling_params[generation_idx].num_return_sequences, generation_outputs.size()); + for (size_t generation_output_idx = 0; generation_output_idx < num_outputs; ++generation_output_idx) { + const auto& generation_output = generation_outputs[generation_output_idx]; + result.m_generation_ids.push_back(std::move(generation_output.generated_token_ids)); + result.m_scores.push_back(generation_output.score); + } + result.m_status = generation->get_status(); + encoded_results.push_back(std::move(result)); + } + + OPENVINO_ASSERT(encoded_results.size() == prompts.size()); + + std::vector decoded_results; + for (ov::genai::EncodedGenerationResult& res : encoded_results) { + std::vector generated; + generated.reserve(res.m_generation_ids.size()); + for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { + generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); + } + decoded_results.push_back(ov::genai::GenerationResult{ + res.m_request_id, + std::move(generated), + std::move(res.m_scores), + res.m_status + }); + } + return decoded_results; +} + +void SpeculativeDecodingPipeline::update_strategy(const size_t num_matches) { + // dynamically adjust number of generated candidates based on number of matches + // we want to balance the benefits of getting candidates tokens correct with the + // cost of forecasting incorrect candidates tokens. + if (m_max_candidates_num == 0) { + return; + } + if (num_matches == m_candidates_num) { + m_candidates_num = std::min(m_candidates_num + 2, m_max_candidates_num); + } else { + m_candidates_num = std::max(int64_t(m_candidates_num) - 1, int64_t(1)); + } +} \ No newline at end of file diff --git a/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp new file mode 100644 index 0000000000..4f03b80aa2 --- /dev/null +++ b/samples/cpp/continuous_batching_speculative_decoding/speculative_decoding_pipeline.hpp @@ -0,0 +1,34 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include + +#include "openvino/genai/tokenizer.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +class SpeculativeDecodingPipeline { +protected: + ov::genai::Tokenizer m_tokenizer; + ov::genai::ContinuousBatchingPipeline m_pipeline, m_speculative_pipeline; + + bool m_is_speculative_mode = false; + size_t m_max_candidates_num = 0, m_candidates_num = 0; + std::map m_to_generate_length; + + void update_strategy(size_t num_matches); + +public: + SpeculativeDecodingPipeline(const std::string& models_path, + const std::string& speculative_model_path, + size_t start_candidates_number, + const ov::genai::SchedulerConfig& scheduler_config, + const std::string& device = "CPU", + const ov::AnyMap& plugin_config = {}); + + void step(); + std::vector + generate(const std::vector& prompts, const std::vector& sampling_params); +}; \ No newline at end of file diff --git a/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp b/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp index 09e6af65e8..158f6fa577 100644 --- a/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp +++ b/samples/cpp/greedy_causal_lm/greedy_causal_lm.cpp @@ -5,17 +5,25 @@ int main(int argc, char* argv[]) try { if (3 > argc) - throw std::runtime_error(std::string{"Usage: "} + argv[0] + " \"\""); + throw std::runtime_error(std::string{"Usage: "} + argv[0] + " \"\" "); std::string model_path = argv[1]; std::string prompt = argv[2]; + std::string len = argv[3]; std::string device = "CPU"; // GPU can be used as well ov::genai::LLMPipeline pipe(model_path, device); ov::genai::GenerationConfig config; - config.max_new_tokens = 100; + config.max_new_tokens = std::stoi(len); + auto start_time = std::chrono::system_clock::now(); std::string result = pipe.generate(prompt, config); + auto end_time = std::chrono::system_clock::now(); std::cout << result << std::endl; + + std::chrono::duration duration = end_time - start_time; + std::cout << std::endl; + std::cout << "Duration: " << duration.count() << std::endl; + std::cout << "Infer number: " << config.max_new_tokens << std::endl; } catch (const std::exception& error) { try { std::cerr << error.what() << '\n'; diff --git a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp index f26cb6c7c4..f535ebb751 100644 --- a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp +++ b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp @@ -174,7 +174,6 @@ class AssistedCandidateGenerator { draft_model.get_tensor("beam_idx").set_shape({BATCH_SIZE}); draft_model.get_tensor("beam_idx").data()[0] = 0; - draft_model.infer(); auto logits = draft_model.get_tensor("logits"); @@ -250,8 +249,8 @@ int64_t get_eos_token(const std::shared_ptr tokenizer) { } // namespace int main(int argc, char* argv[]) try { - if (argc != 4) { - throw std::runtime_error(std::string{"Usage: "} + argv[0] + "
''"); + if (argc != 4 + 2) { + throw std::runtime_error(std::string{"Usage: "} + argv[0] + "
'' "); } // tokenizer model @@ -281,9 +280,9 @@ int main(int argc, char* argv[]) try { ov::InferRequest main_model = core.compile_model(ov_main_model, "CPU").create_infer_request(); - size_t max_sequence_length = 100; + size_t max_sequence_length = std::stoi(std::string{argv[5]}); - AssistedCandidateGenerator candidateGenerator{draft_model, max_sequence_length, 5, draft_model_seq_len_axis}; + AssistedCandidateGenerator candidateGenerator{draft_model, max_sequence_length, std::stoi(std::string{argv[4]}), draft_model_seq_len_axis}; main_model.set_tensor("input_ids", input_ids); main_model.set_tensor("attention_mask", attention_mask); @@ -296,11 +295,15 @@ int main(int argc, char* argv[]) try { main_model.get_tensor("beam_idx").set_shape({BATCH_SIZE}); main_model.get_tensor("beam_idx").data()[0] = 0; + auto start_time = std::chrono::system_clock::now(); + size_t iteration_cnt = 0; + // To coollect kv-cache for the and to get the next token run the very first infer request candidateGenerator.generate_next_token( std::vector(input_ids.data(), input_ids.data() + input_ids.get_size())); main_model.infer(); + ++iteration_cnt; size_t vocab_size = draft_model.get_tensor("logits").get_shape().back(); OPENVINO_ASSERT(vocab_size == main_model.get_tensor("logits").get_shape().back(), @@ -349,6 +352,7 @@ int main(int argc, char* argv[]) try { std::iota(position_ids.data(), position_ids.data() + position_ids.get_size(), seq_len); main_model.infer(); + ++iteration_cnt; data_logits = logits.data(); // [BATCH_SIZE, K, vocab_size] @@ -368,9 +372,8 @@ int main(int argc, char* argv[]) try { break; } - text_streamer.put(out_token); accepted_tokens_number++; - + text_streamer.put(out_token); if (i == candidates_size || out_token != candidates[i]) { break; } diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index 626a51c5da..2e8cc53ca6 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -29,6 +29,8 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { std::shared_ptr m_impl; public: + ContinuousBatchingPipeline() = default; + ContinuousBatchingPipeline(const std::string& models_path, const SchedulerConfig& scheduler_config, const std::string& device = "CPU", @@ -52,6 +54,16 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { const ov::AnyMap& plugin_config={} ); + ContinuousBatchingPipeline( + ov::Core& core, + const std::shared_ptr& model, + const ov::genai::Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device="CPU", + const ov::AnyMap& plugin_config={}, + bool is_enable_validation_mode=false + ); + ov::genai::Tokenizer get_tokenizer(); ov::genai::GenerationConfig get_config() const; @@ -80,5 +92,31 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { * @brief finish chat and clear kv cache. */ void finish_chat(); + + // for speculative decoding + void finish_request(int64_t request_id = -1); + + struct GeneratedSequence { + uint64_t request_id = 0, sequence_id = 0; + std::vector token_ids; + std::vector log_probs; + + GeneratedSequence(uint64_t req_id, uint64_t seq_id, const std::vector& generated_token_ids, const std::vector& generated_log_probs) : + request_id(req_id), + sequence_id(seq_id), + token_ids(generated_token_ids), + log_probs(generated_log_probs) {}; + }; + + struct UpdateSeqResult { + size_t to_insert, to_remove; + UpdateSeqResult(size_t _to_insert = 0, size_t _to_remove = 0) : to_insert(_to_insert), to_remove(_to_remove) {}; + }; + + std::vector get_generated_sequences(); + UpdateSeqResult update_generated_sequence(const GeneratedSequence& new_sequence); }; + +std::shared_ptr OPENVINO_GENAI_EXPORTS read_model_and_apply_paged_attention(const std::string& model_path, ov::Core& core); + } diff --git a/src/cpp/src/block_manager.hpp b/src/cpp/src/block_manager.hpp index 0026ed60d6..5925beb13b 100644 --- a/src/cpp/src/block_manager.hpp +++ b/src/cpp/src/block_manager.hpp @@ -232,6 +232,9 @@ class BlockManager { auto running_sequences = sequence_group->get_not_finished_sequences(); for (size_t idx = 0; idx < running_sequences.size(); ++idx) { auto seq_id = running_sequences[idx]->get_id(); + if (m_block_table.count(seq_id) == 0) { + continue; + } OPENVINO_ASSERT(m_block_table.count(seq_id) > 0, "Invalid sequence group."); auto block_table = m_block_table[seq_id]; free_sequence_partially(seq_id, blocks_num); @@ -444,6 +447,18 @@ class BlockManager { return blocks_count; } + void free_empty_physical_blocks(SequenceGroup::Ptr seq_group) { + size_t num_logical_blocks = seq_group->get_num_logical_blocks(); + for (const auto& sequence : seq_group->get_running_sequences()) { + auto seq_id = sequence->get_id(); + auto& block_table = m_block_table[seq_id]; + size_t num_physical_blocks = block_table.size(); + if (num_physical_blocks > num_logical_blocks) { + free_sequence_partially(seq_id, num_physical_blocks - num_logical_blocks); + } + } + } + std::map> append_slots(SequenceGroup::Ptr seq_group) { size_t num_logical_blocks = seq_group->get_num_logical_blocks(); diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 8044eddc6c..a6bce7a7bd 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -21,7 +21,15 @@ using namespace ov::genai; template struct overloaded : Ts... {using Ts::operator()...;}; template overloaded(Ts...) -> overloaded; -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config); +void apply_paged_attention_transformations(std::shared_ptr model); +void set_type_and_shape_to_kv_cache(std::shared_ptr model, DeviceConfig& device_config); + +std::shared_ptr +ov::genai::read_model_and_apply_paged_attention(const std::string& models_path, ov::Core& core) { + auto model = core.read_model(models_path + "/openvino_model.xml"); + apply_paged_attention_transformations(model); + return model; +} class ContinuousBatchingPipeline::Impl { ov::genai::Tokenizer m_tokenizer; @@ -29,6 +37,7 @@ class ContinuousBatchingPipeline::Impl { std::shared_ptr m_cache_manager; std::shared_ptr m_model_runner; std::shared_ptr m_sampler; + bool m_is_validation_mode_enabled = false; // TODO (mzegla): GenerationConfig is request specific object // and pipeline only uses default rng_seed. @@ -85,17 +94,14 @@ class ContinuousBatchingPipeline::Impl { } } -public: - Impl(const std::string& models_path, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config) : - m_tokenizer{tokenizer} { - ov::Core core; - - // The model can be compiled for GPU as well - std::shared_ptr model = core.read_model(models_path + "/openvino_model.xml"); - - DeviceConfig device_config(core, scheduler_config, device, plugin_config); - - apply_paged_attention_transformations(model, device_config); + inline void + compile_model(std::shared_ptr model, + const SchedulerConfig& scheduler_config, + const ov::AnyMap& plugin_config, + const std::string& device, + ov::Core& core) { + DeviceConfig device_config(core, scheduler_config, device); + set_type_and_shape_to_kv_cache(model, device_config); ov::InferRequest infer_request = core.compile_model(model, device_config.get_device(), plugin_config).create_infer_request(); @@ -105,24 +111,49 @@ class ContinuousBatchingPipeline::Impl { infer_request.set_input_tensor(2 + decoder_layer_id * 2, m_cache_manager->get_key_cache(decoder_layer_id)); infer_request.set_input_tensor(2 + decoder_layer_id * 2 + 1, m_cache_manager->get_value_cache(decoder_layer_id)); } - SchedulerConfig updated_config = scheduler_config; // update KV number in scheduler config if (scheduler_config.num_kv_blocks != device_config.get_num_kv_blocks()) { updated_config.num_kv_blocks = device_config.get_num_kv_blocks(); } - m_scheduler = std::make_shared(updated_config); // and finally create model runner m_model_runner = std::make_shared(infer_request, updated_config); m_sampler = std::make_shared(); m_sampler->set_seed(m_generation_config.rng_seed); + m_sampler->set_seed(0); // read default generation config } + inline void pull_awaiting_requests() { + if (m_requests.empty()) { + // Pull awaiting requests + if (!m_awaiting_requests.empty()) { + std::lock_guard lock{m_awaiting_requests_mutex}; + m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); + m_awaiting_requests.clear(); + } + } + } + +public: + Impl(const std::string& models_path, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config) : + m_tokenizer{tokenizer} { + ov::Core core; + // The model can be compiled for GPU as well + std::shared_ptr model = read_model_and_apply_paged_attention(models_path, core); + compile_model(model, scheduler_config, plugin_config, device, core); + } + Impl(const std::string& models_path, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& llm_plugin_config, const ov::AnyMap& tokenizer_plugin_config) : Impl{models_path, Tokenizer(models_path, tokenizer_plugin_config), scheduler_config, device, llm_plugin_config} {} + + Impl(ov::Core& core, std::shared_ptr model, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config, bool is_validation_mode = false) : + m_is_validation_mode_enabled(is_validation_mode), + m_tokenizer{tokenizer} { + compile_model(model, scheduler_config, plugin_config, device, core); + } ov::genai::GenerationConfig get_config() const { return m_generation_config; @@ -167,18 +198,15 @@ class ContinuousBatchingPipeline::Impl { static ManualTimer step_timer("step()"); step_timer.start(); - // Pull awaiting requests - { - std::lock_guard lock{m_awaiting_requests_mutex}; - m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); - m_awaiting_requests.clear(); - } + pull_awaiting_requests(); m_pipeline_metrics.requests = m_requests.size(); Scheduler::Output scheduler_output; { static ManualTimer timer("scheduling"); timer.start(); + // todo: iefode: to move to other place? + m_scheduler->clean_empty_blocks(m_requests); scheduler_output = m_scheduler->schedule(m_requests); m_pipeline_metrics.scheduled_requests = scheduler_output.m_scheduled_sequence_groups_ids.size(); m_pipeline_metrics.cache_usage = scheduler_output.m_cache_usage; @@ -221,7 +249,7 @@ class ContinuousBatchingPipeline::Impl { { static ManualTimer timer("sample"); timer.start(); - sampler_output = m_sampler->sample(m_requests, logits); + sampler_output = m_sampler->sample(m_requests, logits, m_is_validation_mode_enabled); timer.end(); } @@ -382,6 +410,129 @@ class ContinuousBatchingPipeline::Impl { m_is_chat_conversation = false; m_history.clear(); }; + + void finish_request(int64_t request_id) { + if (request_id == -1) { + while (!m_requests.empty()) { + const auto& request = *m_requests.rbegin(); + for (const auto& sequence : request->get_sequences()) { + m_scheduler->free_sequence(sequence->get_id()); + } + m_sampler->clear_beam_search_info(request->get_request_id()); + m_requests.pop_back(); + } + } else { + for (size_t i = 0; i < m_requests.size(); ++i) { + auto& request = m_requests[i]; + if (request->get_request_id() != request_id) { + continue; + } + for (const auto& sequence : request->get_sequences()) { + m_scheduler->free_sequence(sequence->get_id()); + } + m_sampler->clear_beam_search_info(request->get_request_id()); + m_requests.erase(m_requests.begin() + i); + break; + } + } + } + + std::vector get_generated_sequences() { + pull_awaiting_requests(); + std::vector result; + for (const auto& request : m_requests) { + const auto request_id = request->get_request_id(); + for (const auto& sequence : request->get_sequences()) { + auto generated_ids = sequence->get_generated_ids(); + auto log_probs = sequence->get_log_probs(); + result.emplace_back(request_id, sequence->get_grouped_id(), generated_ids, log_probs); + } + } + return result; + + } + + ContinuousBatchingPipeline::UpdateSeqResult + update_generated_sequence(const ContinuousBatchingPipeline::GeneratedSequence& candidate_sequence) { + pull_awaiting_requests(); + bool is_empty_generated_tokens = false; + for (auto& request : m_requests) { + if (candidate_sequence.request_id == request->get_request_id()) { + bool is_seq_exists = false; + // todo: iefode: multiseq + size_t to_remove_tokens = 0, to_insert_tokens = 0; + for (auto& sequence : request->get_sequences()) { + if (candidate_sequence.sequence_id == sequence->get_grouped_id()) { + is_seq_exists = true; + auto present_ids = sequence->get_generated_ids(); + const auto& candidate_ids = candidate_sequence.token_ids; + + // remove extra tokens from sequence + { + auto token_idx = std::min(present_ids.size(), candidate_ids.size()); + if (token_idx) { + while (token_idx-- > 0) { + if (present_ids[token_idx] == candidate_ids[token_idx]) { + break; + } + } + to_remove_tokens = present_ids.size() - (token_idx + 1); + if (to_remove_tokens > 0) { + const auto gen_ids_before = sequence->get_generated_ids(); + sequence->remove_last_n_tokens(to_remove_tokens); + present_ids = sequence->get_generated_ids(); + const size_t gen_len_before = gen_ids_before.size(), + gen_len_after = present_ids.size(); + if (gen_len_after == 0) { + is_empty_generated_tokens = true; + } + OPENVINO_ASSERT(gen_len_after < gen_len_before); + for (size_t i = gen_len_after; i < gen_len_before; ++i) { + m_sampler->update_logit_processor(request->get_request_id(), gen_ids_before[i]); + } + } + } + } + // insert new tokens to sequence + { + OPENVINO_ASSERT(candidate_ids.size() >= present_ids.size()); + const auto& candidate_log_probs = candidate_sequence.log_probs; + const size_t start_id = std::min(present_ids.size(), candidate_ids.size()), + stop_id = std::max(present_ids.size(), candidate_ids.size()); + to_insert_tokens = stop_id - start_id; + for (size_t i = start_id; i < stop_id; ++i) { + sequence->append_token(candidate_ids[i], i < candidate_log_probs.size() ? candidate_log_probs[i] : 0.f); + } + } + } + break; + } + if (!is_seq_exists) { + Sequence::Ptr new_sequence(new Sequence(candidate_sequence.sequence_id)); + const auto& generated_tokens = candidate_sequence.token_ids; + const auto& generated_log_probs = candidate_sequence.log_probs; + for (size_t i = 0; i < generated_tokens.size(); ++i) { + new_sequence->append_token(generated_tokens[i], generated_log_probs[i]); + } + request->add_sequence(new_sequence); + } + if (!is_empty_generated_tokens) { + // in case of non-prompt we need to take prev tokens + token to validate + if (request->get_num_processed_tokens()) + ++to_insert_tokens; + if (to_remove_tokens > 0) { + request->decrease_processed_tokens(to_remove_tokens); + } + // to validate tokens/extend kv-cache before generation + request->set_validation_len(to_insert_tokens); + } else if (to_remove_tokens > 0) { + request->update_processed_tokens_num(request->get_prompt_len()); + } + return ContinuousBatchingPipeline::UpdateSeqResult(to_insert_tokens, to_remove_tokens); + } + } + return {0, 0}; + } }; ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::string& models_path, @@ -400,6 +551,16 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const ov::AnyMap& plugin_config ) : m_impl{std::make_shared(model_path, tokenizer, scheduler_config, device, plugin_config)} {} +ContinuousBatchingPipeline::ContinuousBatchingPipeline( + ov::Core& core, + const std::shared_ptr& model, + const ov::genai::Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config, + bool is_enable_validation_mode +) : m_impl{std::make_shared(core, model, tokenizer, scheduler_config, device, plugin_config, is_enable_validation_mode)} {} + ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() { return m_impl->get_tokenizer(); } @@ -443,3 +604,16 @@ void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { void ContinuousBatchingPipeline::finish_chat() { m_impl->finish_chat(); }; +void ContinuousBatchingPipeline::finish_request(int64_t request_id) { + m_impl->finish_request(request_id); +} + +std::vector +ContinuousBatchingPipeline::get_generated_sequences() { + return m_impl->get_generated_sequences(); +} + +ContinuousBatchingPipeline::UpdateSeqResult +ContinuousBatchingPipeline::update_generated_sequence(const ContinuousBatchingPipeline::GeneratedSequence& new_sequence) { + return m_impl->update_generated_sequence(new_sequence); +} diff --git a/src/cpp/src/generation_config.cpp b/src/cpp/src/generation_config.cpp index f84600eb33..d05a62809c 100644 --- a/src/cpp/src/generation_config.cpp +++ b/src/cpp/src/generation_config.cpp @@ -161,9 +161,9 @@ GenerationConfig greedy() { greedy_config.temperature = 0.0f; greedy_config.ignore_eos = true; greedy_config.num_return_sequences = 1; - greedy_config.repetition_penalty = 3.0f; - greedy_config.presence_penalty = 0.1f; - greedy_config.frequency_penalty = 0.01f; + // greedy_config.repetition_penalty = 3.0f; + // greedy_config.presence_penalty = 0.1f; + // greedy_config.frequency_penalty = 0.01f; greedy_config.max_new_tokens = 30; return greedy_config; } diff --git a/src/cpp/src/logit_processor.hpp b/src/cpp/src/logit_processor.hpp index bdd15f429c..853224b2bd 100644 --- a/src/cpp/src/logit_processor.hpp +++ b/src/cpp/src/logit_processor.hpp @@ -367,6 +367,15 @@ class LogitProcessor { ++m_generated_tokens; } + void decrement_gen_tokens() { + OPENVINO_ASSERT(m_generated_tokens > 0); + --m_generated_tokens; + } + + size_t get_gen_token_len() { + return m_generated_tokens; + } + void register_new_generated_token(int64_t new_token_id) { auto it = m_unique_generated_token_ids->find(new_token_id); if (it == m_unique_generated_token_ids->end()) { @@ -375,4 +384,9 @@ class LogitProcessor { it->second++; } } + + void decrease_generated_token_occurance(int64_t token_id) { + OPENVINO_ASSERT(m_unique_generated_token_ids->count(token_id) > 0); + m_unique_generated_token_ids->at(token_id)--; + } }; diff --git a/src/cpp/src/paged_attention_transformations.cpp b/src/cpp/src/paged_attention_transformations.cpp index 3f343048ea..57d8c655fd 100644 --- a/src/cpp/src/paged_attention_transformations.cpp +++ b/src/cpp/src/paged_attention_transformations.cpp @@ -16,12 +16,13 @@ inline ov::PartialShape to_partial_with_dyn_0_dim(const ov::Shape& static_shape) return partial_shape; } -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config) { +void apply_paged_attention_transformations(std::shared_ptr model) { const ov::op::util::VariableVector& variables = model->get_variables(); OPENVINO_ASSERT(!variables.empty(), "Model is supposed to be stateful"); - ov::pass::SDPAToPagedAttention().run_on_model(model); +} +void set_type_and_shape_to_kv_cache(std::shared_ptr model, DeviceConfig& device_config) { const ov::ParameterVector& parameters = model->get_parameters(); size_t num_layers = std::count_if(parameters.begin(), parameters.end(), [](std::shared_ptr parameter) { diff --git a/src/cpp/src/sampler.hpp b/src/cpp/src/sampler.hpp index 2670e50d03..9d62cd24b6 100644 --- a/src/cpp/src/sampler.hpp +++ b/src/cpp/src/sampler.hpp @@ -206,12 +206,13 @@ class GroupBeamSearcher { class Sampler { - Logits _get_logit_vector(ov::Tensor logits, size_t batch_idx = 1) { + Logits _get_logit_vector(ov::Tensor logits, size_t batch_idx = 1, size_t token_offset = 0) { ov::Shape logits_shape = logits.get_shape(); size_t batch_size = logits_shape[0], seq_len = logits_shape[1], vocab_size = logits_shape[2]; OPENVINO_ASSERT(batch_idx <= batch_size); + OPENVINO_ASSERT(token_offset < seq_len); size_t batch_offset = batch_idx * seq_len * vocab_size; - size_t sequence_offset = (seq_len - 1) * vocab_size; + size_t sequence_offset = (seq_len - token_offset - 1) * vocab_size; float* logits_data = logits.data() + batch_offset + sequence_offset; return Logits{logits_data, vocab_size}; @@ -261,14 +262,21 @@ class Sampler { std::map m_logit_processors; public: - SamplerOutput sample(std::vector & sequence_groups, ov::Tensor logits); + SamplerOutput sample(std::vector & sequence_groups, ov::Tensor logits, bool is_validation_mode_enabled = false); void set_seed(size_t seed) { rng_engine.seed(seed); } void clear_beam_search_info(uint64_t request_id); + + void update_logit_processor(uint64_t request_id, uint64_t token_id) { + OPENVINO_ASSERT(m_logit_processors.count(request_id)); + auto& logit_processor = m_logit_processors.at(request_id); + logit_processor.decrease_generated_token_occurance(token_id); + logit_processor.decrement_gen_tokens(); + } }; -SamplerOutput Sampler::sample(std::vector & sequence_groups, ov::Tensor logits) { +SamplerOutput Sampler::sample(std::vector & sequence_groups, ov::Tensor logits, bool is_validation_mode_enabled) { const float * logits_data = logits.data(); ov::Shape logits_shape = logits.get_shape(); OPENVINO_ASSERT(logits_shape.size() == 3); @@ -295,44 +303,93 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, const void * sequence_group_logits_data = logits_data + vocab_size * currently_processed_tokens; ov::Tensor sequence_group_logits(ov::element::f32, ov::Shape{num_running_sequences, actual_seq_len, vocab_size}, (void *)sequence_group_logits_data); + size_t decrease_len = 0; if (sequence_group->requires_sampling()) { + auto token_id = actual_seq_len; + // prompt phase + if (sequence_group->get_num_processed_tokens() == 0) { + token_id -= (sequence_group->get_prompt_len() - 1); + } if (sampling_params.is_greedy_decoding() || sampling_params.is_multinomial()) { std::vector running_sequences = sequence_group->get_running_sequences(); - if (sampling_params.is_greedy_decoding()) { - OPENVINO_ASSERT(num_running_sequences == 1); + // limitation for speculative decoding + if (is_validation_mode_enabled || sampling_params.is_greedy_decoding()) { + OPENVINO_ASSERT(running_sequences.size() == 1); } - auto register_new_token = [&](const Token& sampled_token_id, Sequence::Ptr running_sequence) { + auto register_new_token = [&](const Token& sampled_token_id, size_t running_sequence_id, bool is_extend_sequence = true) { logit_processor.register_new_generated_token(sampled_token_id.m_index); - running_sequence->append_token(sampled_token_id.m_index, sampled_token_id.m_log_prob); + // increment seq len only for one sequence in sequence group to sync them + if (is_extend_sequence) { + running_sequences[running_sequence_id]->append_token(sampled_token_id.m_index, sampled_token_id.m_log_prob); + } else { + OPENVINO_ASSERT(logit_processor.get_gen_token_len() < running_sequences[running_sequence_id]->get_generated_len()); + running_sequences[running_sequence_id]->update_log_prob(sampled_token_id.m_log_prob, logit_processor.get_gen_token_len()); + } + if (running_sequence_id == num_running_sequences - 1) { + logit_processor.increment_gen_tokens(); + } }; for (size_t running_sequence_id = 0; running_sequence_id < num_running_sequences; ++running_sequence_id) { - auto logit_vector = _get_logit_vector(sequence_group_logits, running_sequence_id); - logit_processor.apply(logit_vector); - Token sampled_token_id; - if (sampling_params.is_greedy_decoding()) { - sampled_token_id = _greedy_sample(logit_vector); - } else { - // is_multinomial() - const bool is_generate_n_tokens = sequence_group->num_total_seqs() == 1; - const size_t num_tokens_per_sequence = is_generate_n_tokens ? sampling_params.num_return_sequences : 1; - auto sampled_token_ids = _multinomial_sample(logit_vector, num_tokens_per_sequence); - sampled_token_id = sampled_token_ids[0]; - - if (is_generate_n_tokens) { - auto sequence_to_fork = running_sequences[0]; - std::list forked_seq_ids; - for (size_t i = num_running_sequences; i < num_tokens_per_sequence; ++i) { - const auto forked_sequence = sequence_group->fork_sequence(sequence_to_fork); - forked_seq_ids.push_back(forked_sequence->get_id()); - register_new_token(sampled_token_ids[i], forked_sequence); + const auto running_seq_token_ids = running_sequences[running_sequence_id]->get_generated_ids(); + // max lenght of new added tokens + auto max_new_tokens_cnt = sampling_params.max_new_tokens - running_seq_token_ids.size() + token_id; + // token offset in case of multiple token infer + auto token_id_per_seq = token_id; + while (--token_id_per_seq >= 0 && --max_new_tokens_cnt >= 0) { + if (max_new_tokens_cnt == 0) { + running_sequences[running_sequence_id]->remove_last_n_tokens(token_id_per_seq); + decrease_len = std::max(decrease_len, token_id_per_seq); + break; + } + Token sampled_token_id; + // get logit processors only for validation or generation, not for cases of extending KV cache + if (token_id_per_seq == 0 && !is_validation_mode_enabled || is_validation_mode_enabled) { + auto logit_vector = _get_logit_vector(sequence_group_logits, running_sequence_id, token_id_per_seq); + logit_processor.apply(logit_vector); + + if (sampling_params.is_greedy_decoding()) { + sampled_token_id = _greedy_sample(logit_vector); + } else { + // is_multinomial() + const bool is_generate_n_tokens = sequence_group->num_total_seqs() == 1; + const size_t num_tokens_per_sequence = is_generate_n_tokens ? sampling_params.num_return_sequences : 1; + auto sampled_token_ids = _multinomial_sample(logit_vector, num_tokens_per_sequence); + sampled_token_id = sampled_token_ids[0]; + + if (is_generate_n_tokens) { + auto sequence_to_fork = running_sequences[0]; + std::list forked_seq_ids; + for (size_t i = num_running_sequences; i < num_tokens_per_sequence; ++i) { + const auto forked_sequence_id = sequence_group->fork_sequence(sequence_to_fork)->get_id(); + forked_seq_ids.push_back(forked_sequence_id); + register_new_token(sampled_token_ids[i], forked_sequence_id); + } + sampler_output.m_forked_sequences.insert({running_sequences[0]->get_id(), forked_seq_ids}); + } } - sampler_output.m_forked_sequences.insert({running_sequences[0]->get_id(), forked_seq_ids}); } - } - - register_new_token(sampled_token_id, running_sequences[running_sequence_id]); + + // flag to add sampled token to generated sequence or extend logit processors only + bool is_extend_sequence = token_id_per_seq == 0; + if (token_id_per_seq > 0) { + auto it = running_seq_token_ids.rbegin(); + std::advance(it, token_id_per_seq - 1); + // to validate candidates from assisting model and remove incorrect ones from generated sequence + if (is_validation_mode_enabled && *it != sampled_token_id.m_index) { + running_sequences[running_sequence_id]->remove_last_n_tokens(token_id_per_seq); + decrease_len = std::max(decrease_len, token_id_per_seq); + is_extend_sequence = true; + token_id_per_seq = 0; + } else { + sampled_token_id.m_index = *it; + } + } + register_new_token(sampled_token_id, running_sequence_id, is_extend_sequence); + if (token_id_per_seq == 0) { + break; + } + }; } - logit_processor.increment_gen_tokens(); for (const auto& dropped_seq_id : sequence_group->try_finish_generation()) { sampler_output.m_dropped_sequences.push_back(dropped_seq_id); } @@ -365,6 +422,9 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, // NOTE: it should be before 'get_num_scheduled_tokens' is used // update internal state of sequence group to reset scheduler tokens and update currently processed ones sequence_group->finish_iteration(); + if (decrease_len) { + sequence_group->decrease_processed_tokens(decrease_len); + } // accumulate a number of processed tokens currently_processed_tokens += padded_amount_of_processed_tokens * num_running_sequences; diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index 3825caf446..0384102667 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -62,6 +62,11 @@ class Scheduler { return scheduler_output; } + void clean_empty_blocks(std::vector& seq_groups) { + for (const auto& seq_group : seq_groups) + m_block_manager.free_empty_physical_blocks(seq_group); + } + const std::vector& get_block_table(const Sequence& seq) { return m_block_manager.get_block_table(seq.get_id()); } @@ -318,7 +323,7 @@ class Scheduler { // prompt phases can have a single running sequence OPENVINO_ASSERT(num_running_seqs == 1); // here we also assume that sequence must be scheduler in a single shot and has no already generated context - if (!m_config.enable_prefix_caching) + if (!m_config.enable_prefix_caching && sequence_group->get_validation_len() == 0) OPENVINO_ASSERT(sequence_group->get_context_len() == 0); size_t num_available_tokens_in_megabatch = m_config.max_num_batched_tokens - scheduler_output.m_total_num_scheduled_tokens; diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 73298a702c..6d80b74796 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -21,6 +21,7 @@ enum class SequenceStatus { }; using TokenIds = std::vector; +using Probs = std::vector; class SequenceGroup; class Sequence { @@ -31,6 +32,7 @@ class Sequence { } TokenIds m_generated_ids; + Probs m_generated_probs; uint64_t m_grouped_id; uint64_t m_id = _get_next_global_sequence_id(); SequenceStatus m_status = SequenceStatus::RUNNING; @@ -50,6 +52,7 @@ class Sequence { // don't use directly Sequence(const Sequence& seq, const uint64_t id) : m_generated_ids(seq.m_generated_ids), + m_generated_probs(seq.m_generated_probs), m_grouped_id(id), m_status(seq.m_status), m_cumulative_log_prob(seq.m_cumulative_log_prob){ @@ -107,7 +110,19 @@ class Sequence { // appends new tokens to a generated part void append_token(int64_t token_id, float log_prob) { m_cumulative_log_prob += log_prob; - m_generated_ids.push_back(token_id); + m_generated_ids.push_back(token_id); + m_generated_probs.push_back(log_prob); + } + + void remove_last_n_tokens(size_t k) { + const size_t generated_token_len = get_generated_len(); + OPENVINO_ASSERT(k <= generated_token_len); + for (size_t i = 0; i < k; ++i) { + m_cumulative_log_prob -= m_generated_probs[generated_token_len - 1 - i]; + } + size_t new_length = generated_token_len - k; + m_generated_ids.resize(new_length); + m_generated_probs.resize(new_length); } GenerationOutput get_last_generation_output() { @@ -127,6 +142,15 @@ class Sequence { return m_generated_ids; } + const Probs & get_log_probs() const { + return m_generated_probs; + } + + void update_log_prob(float new_log_prob, size_t idx) { + OPENVINO_ASSERT(idx <= m_generated_probs.size()); + m_generated_probs[idx] = new_log_prob; + } + float get_cumulative_log_probs() const { return m_cumulative_log_prob; } @@ -174,6 +198,8 @@ class SequenceGroup { size_t m_num_scheduled_tokens = 0; // context length of longest sequence within a group size_t m_max_content_len = 0; + // max validation length within a group to check generated tokens + size_t m_validation_len = 0; SequenceGroup(uint64_t request_id, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size, bool enable_prefix_caching) : m_request_id(request_id), @@ -360,24 +386,49 @@ class SequenceGroup { m_num_scheduled_tokens = 0; } + void clear_validated_tokens() { + m_validation_len = 0; + } + bool is_scheduled() const { return m_num_scheduled_tokens > 0; } + void set_validation_len(size_t k) { + m_validation_len = k; + } + + size_t get_validation_len() { + return m_validation_len; + } + size_t get_num_available_tokens_for_batching() const { OPENVINO_ASSERT(!has_finished(), "Internal error: this function cannot be called on finished sequence group"); OPENVINO_ASSERT(get_num_scheduled_tokens() == 0, "Internal error: this function cannot be called when we are already in scheduling phase"); // if sequence group has not finished, it has at least one token to process - size_t num_available_tokens = std::max(get_prompt_len(), m_max_content_len); + size_t num_available_tokens = std::max(get_prompt_len(), m_max_content_len) + m_validation_len; return std::max(num_available_tokens - m_num_processed_tokens, 1u); } + void increase_processed_tokens(size_t token_cnt) { + m_num_processed_tokens += token_cnt; + m_max_content_len += token_cnt; + } + + void decrease_processed_tokens(size_t token_cnt) { + OPENVINO_ASSERT(m_num_processed_tokens >= token_cnt); + m_num_processed_tokens -= token_cnt; + OPENVINO_ASSERT(m_max_content_len >= token_cnt); + m_max_content_len -= token_cnt; + } + // mark current schedule phase as finished and updates internal counters void finish_iteration() { m_num_processed_tokens += m_num_scheduled_tokens; // if some processed tokens were evicted, max content len is greater than number of processed tokens m_max_content_len = std::max(m_max_content_len, m_num_processed_tokens); clear_scheduled_tokens(); + clear_validated_tokens(); } void update_processed_tokens_num(size_t processed_tokens) { @@ -513,9 +564,10 @@ class SequenceGroup { } } else if (m_sampling_params.is_greedy_decoding() || m_sampling_params.is_multinomial()) { // TO DO: Now we always stream for greedy search for the sake of benchmarking - if (num_total_seqs() == 1) { - push_partial_outputs(); - } else if (has_finished() || out_of_memory()) { + // if (num_total_seqs() == 1) { + // push_partial_outputs(); + // } else + if (has_finished() || out_of_memory()) { push_outputs(); } }