diff --git a/src/cpp/continuous_batching/src/block_manager.hpp b/src/cpp/continuous_batching/src/block_manager.hpp index 0d61479609..2d0e25e13a 100644 --- a/src/cpp/continuous_batching/src/block_manager.hpp +++ b/src/cpp/continuous_batching/src/block_manager.hpp @@ -110,6 +110,78 @@ class BlockManager { return m_block_table[seq_id]; } + const size_t free_rightest_blocks(SequenceGroup::Ptr sequence_group) { + size_t blocks_released = 0; + auto running_sequences = sequence_group->get_not_finished_sequences(); + std::set blocks_released_indices; + for (size_t idx = 0; idx < running_sequences.size(); ++idx) { + auto seq_id = running_sequences[idx]->get_id(); + OPENVINO_ASSERT(m_block_table.count(seq_id) > 0, "Invalid sequence group."); + auto block_table = m_block_table[seq_id]; + if (free_last_block(seq_id)) { + blocks_released++; + } + } + return blocks_released; + } + + const bool free_group_partially_multiple_runnning_sequence(SequenceGroup::Ptr sequence_group, size_t num_required_blocks, size_t& phisical_blocks_released, size_t& logical_blocks_released) { + phisical_blocks_released = 0; + logical_blocks_released = 0; + while (num_required_blocks > phisical_blocks_released) { + size_t released_count = free_rightest_blocks(sequence_group); + logical_blocks_released += 1; + if (get_number_of_blocks_occupied_by_sequence(sequence_group) == 0) { + break; + } + phisical_blocks_released += released_count; + } + phisical_blocks_released = phisical_blocks_released; + return num_required_blocks <= phisical_blocks_released; + } + + const bool free_group_partially_single_runnning_sequence(SequenceGroup::Ptr sequence_group, size_t num_required_blocks, size_t& phisical_blocks_released) { + auto sequences = sequence_group->get_not_finished_sequences(); + OPENVINO_ASSERT(sequences.size() == 1); + auto running_sequence = sequences[0]; + auto seq_id = running_sequence->get_id(); + if (!has_block_table(seq_id)) { + // no blocks are allocated for this sequence, so it can't be preempted + return false; + } + auto block_table = get_block_table(seq_id); + auto prev_blocks_count = num_free_blocks(); + free_sequence_partially_single_runnning_sequence(seq_id, num_required_blocks); + + // calculate the number of released blocks + phisical_blocks_released = num_free_blocks() - prev_blocks_count; + + return num_required_blocks <= phisical_blocks_released; + } + + const size_t get_number_of_blocks_occupied_by_sequence(SequenceGroup::Ptr sequence_group) { + auto running_sequences = sequence_group->get_not_finished_sequences(); + size_t num_blocks = 0; + std::set indices; + for (size_t idx = 0; idx < running_sequences.size(); ++idx) { + auto seq_id = running_sequences[idx]->get_id(); + if (m_block_table.count(seq_id) == 0) { + continue; + } + // OPENVINO_ASSERT(m_block_table.count(seq_id) > 0, "Invalid sequence group."); + auto block_table = m_block_table[seq_id]; + size_t last_idx = block_table.back()->get_index(); + if (indices.find(last_idx) != indices.end()) { + continue; + } + else { + indices.insert(last_idx); + num_blocks += block_table.size(); + } + } + return num_blocks; + } + const bool has_block_table(uint64_t seq_id) { return m_block_table.count(seq_id) > 0; } @@ -153,11 +225,23 @@ class BlockManager { OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1); } - void free_sequence_partially(size_t seq_id, size_t block_num) { - // currently this method is applicable only for groups with single sequences - // TODO: support for groups with multiple sequences + bool free_last_block(size_t seq_id) { auto block_table = m_block_table[seq_id]; + OPENVINO_ASSERT(block_table.size() >= 1); + size_t block_idx = m_block_table[seq_id].size() - 1; + m_allocator.free(block_table[block_idx]); + m_block_table[seq_id].resize(m_block_table[seq_id].size() - 1); + if (m_block_table[seq_id].size() == 0) { + OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1); + } + return block_table[block_idx]->is_free(); + } + + void free_sequence_partially_single_runnning_sequence(size_t seq_id, size_t block_num) { + // this method is applicable only for groups with single sequences + + auto block_table = m_block_table[seq_id]; OPENVINO_ASSERT(block_table.size() >= block_num); for (size_t idx = 0; idx < block_num; idx++) { size_t block_idx = m_block_table[seq_id].size() - idx - 1; @@ -166,7 +250,7 @@ class BlockManager { } m_block_table[seq_id].resize(m_block_table[seq_id].size() - block_num); - if (m_block_table.size() == 0) { + if (m_block_table[seq_id].size() == 0) { OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1); } } @@ -200,6 +284,7 @@ class BlockManager { if (last_block_ids.find(last_block_id) != last_block_ids.end()) // this block was already processed continue; + last_block_ids.insert(last_block_id); size_t needed_blocks_per_sequence = seq_group->get_num_logical_blocks() - num_physical_blocks; diff --git a/src/cpp/continuous_batching/src/sampler.hpp b/src/cpp/continuous_batching/src/sampler.hpp index 6672825b15..ff0b463bd3 100644 --- a/src/cpp/continuous_batching/src/sampler.hpp +++ b/src/cpp/continuous_batching/src/sampler.hpp @@ -324,15 +324,6 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, if (m_beam_search_info.find(request_id) == m_beam_search_info.end()) { m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group)); } - else { - // sequence group can be empty if returned after preemption - if (sequence_group->is_empty()) { - // clear beam search info - m_beam_search_info.erase(request_id); - m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group)); - } - } - // current algorithm already adds new tokens to running sequences and m_beam_search_info.at(request_id).select_next_tokens(sequence_group_logits, sampler_output); diff --git a/src/cpp/continuous_batching/src/scheduler.hpp b/src/cpp/continuous_batching/src/scheduler.hpp index f463d681d2..ed882dcf9c 100644 --- a/src/cpp/continuous_batching/src/scheduler.hpp +++ b/src/cpp/continuous_batching/src/scheduler.hpp @@ -101,52 +101,45 @@ class Scheduler { size_t prev_blocks_count = m_block_manager.num_free_blocks(); size_t num_running_sequences = sequence_group->num_running_seqs(); size_t preempted_tokens = 0; + size_t num_blocks_occupied_by_sequence = m_block_manager.get_number_of_blocks_occupied_by_sequence(sequence_group); - if (num_running_sequences > 1) { - for (size_t s = 0; s < sequence_group->num_running_seqs(); ++s) { - auto seq_id = (*sequence_group)[s]->get_id(); + if (num_blocks_occupied_by_sequence <= blocks_needed) { + auto sequences = sequence_group->get_not_finished_sequences(); + for (size_t s = 0; s < sequences.size(); ++s) { + auto seq_id = sequences[s]->get_id(); m_block_manager.free_sequence(seq_id); } - sequence_group->reset(); + sequence_group->preempt_tokens(processed_tokens); sequence_group->set_waiting(); return m_block_manager.num_free_blocks() > prev_blocks_count; } - // currently partial preemtion is enabled only for single running sequence case - // TODO: implement partial preemption for case with muliple sequences in group - for (size_t s = 0; s < num_running_sequences; ++s) { - auto seq_id = (*sequence_group)[s]->get_id(); - if (!m_block_manager.has_block_table(seq_id)) { - // no blocks are allocated for this sequence, so it can't be preempted - return false; - } - auto block_table = m_block_manager.get_block_table(seq_id); - size_t required_blocks = blocks_needed - total_num_released_blocks; - if (required_blocks >= block_table.size()) { - // fully drop a sequence(s) from block_manager - m_block_manager.free_sequence(seq_id); - } - else { - m_block_manager.free_sequence_partially(seq_id, required_blocks); - } - - // calculate the number of released blocks - auto released_blocks = m_block_manager.num_free_blocks() - prev_blocks_count; - total_num_released_blocks += released_blocks; - prev_blocks_count = m_block_manager.num_free_blocks(); - + if (num_running_sequences > 1) { + size_t phisycal_blocks_released; + size_t logical_blocks_released; + m_block_manager.free_group_partially_multiple_runnning_sequence(sequence_group, blocks_needed, phisycal_blocks_released, logical_blocks_released); // calculate the number of preempted tokens auto tokens_in_last_block = processed_tokens % block_size; if (tokens_in_last_block == 0) { tokens_in_last_block = block_size; } + preempted_tokens = tokens_in_last_block + std::max((int)logical_blocks_released - 1, 0) * block_size; - preempted_tokens += tokens_in_last_block + std::max((int)released_blocks - 1, 0) * block_size; - if (m_block_manager.num_free_blocks() >= blocks_needed) { - break; + } + else { + OPENVINO_ASSERT(num_running_sequences == 1); + size_t phisycal_blocks_released; + m_block_manager.free_group_partially_single_runnning_sequence(sequence_group, blocks_needed, phisycal_blocks_released); + + // calculate the number of preempted tokens + auto tokens_in_last_block = processed_tokens % block_size; + if (tokens_in_last_block == 0) { + tokens_in_last_block = block_size; } + preempted_tokens = tokens_in_last_block + std::max((int)phisycal_blocks_released - 1, 0) * block_size; } + // case when preemption requires preempt prompt tokens if (!m_config.dynamic_split_fuse && processed_tokens - preempted_tokens < sequence_group->get_prompt_len()) { // preempt prompt fully to not leave partially generated prompt diff --git a/src/cpp/continuous_batching/src/sequence_group.hpp b/src/cpp/continuous_batching/src/sequence_group.hpp index 4897789f6f..3d6e61f407 100644 --- a/src/cpp/continuous_batching/src/sequence_group.hpp +++ b/src/cpp/continuous_batching/src/sequence_group.hpp @@ -266,6 +266,17 @@ class SequenceGroup { return running_seqs; } + std::vector get_not_finished_sequences() { + std::vector running_seqs; + for (size_t seq_id = 0; seq_id < m_sequences.size(); ++seq_id) { + if (!m_sequences[seq_id]->has_finished()) { + running_seqs.emplace_back(m_sequences[seq_id]); + } + } + + return running_seqs; + } + std::vector get_running_sequences() const { std::vector running_seqs; for (size_t seq_id = 0; seq_id < m_sequences.size(); ++seq_id) { @@ -367,24 +378,6 @@ class SequenceGroup { return m_sampling_params; } - void reset() { - m_sequences.clear(); - m_next_sequence_id = 0; - add_sequence(Sequence::create(m_next_sequence_id++)); - clear_scheduled_tokens(); - m_num_processed_tokens = 0; - m_max_content_len = 0; - } - - bool is_empty() { - if (m_sequences.size() > 1) - return false; - OPENVINO_ASSERT(m_sequences.size() == 1); - if (m_sequences[0]->get_generated_len() > 0 || m_sequences[0]->get_cumulative_log_probs() != 0.0f) - return false; - return true; - } - void set_out_of_memory() { for (size_t seq_id = 0; seq_id < m_sequences.size(); ++seq_id) { if (m_sequences[seq_id]->is_running()) { diff --git a/src/cpp/continuous_batching/src/tests/block_manager.cpp b/src/cpp/continuous_batching/src/tests/block_manager.cpp index 6927a98164..89d88ed54c 100644 --- a/src/cpp/continuous_batching/src/tests/block_manager.cpp +++ b/src/cpp/continuous_batching/src/tests/block_manager.cpp @@ -17,7 +17,7 @@ TEST(TestBlockManager, general_test) { EXPECT_EQ(bm.get_block_table(0).size(), 6); EXPECT_EQ(bm.num_free_blocks(), 0); - bm.free_sequence_partially(0, 4); + bm.free_sequence_partially_single_runnning_sequence(0, 4); EXPECT_EQ(bm.get_block_table(0).size(), 2); EXPECT_EQ(bm.num_free_blocks(), 4); @@ -29,4 +29,42 @@ TEST(TestBlockManager, general_test) { bm.fork_sequence(0, 1); EXPECT_TRUE(bm.has_block_table(1)); EXPECT_EQ(bm.get_block_table(1).back()->get_references_count(), 2); + } + +TEST(TestBlockManager, required_blocks_count) { + BlockManager bm = BlockManager(8); + + std::vector tokens = {0,1,2,3,4}; + SequenceGroup::Ptr sequence_group = std::make_shared( + 0, + ov::Tensor(ov::element::i64, { + tokens.size()}, tokens.data()), + GenerationConfig::beam_search(), + 4); + sequence_group->schedule_tokens(5); + auto required_blocks = bm.required_blocks_count(sequence_group); + EXPECT_EQ(required_blocks, 2); + EXPECT_TRUE(bm.can_append_slots(sequence_group)); + bm.append_slots(sequence_group); + EXPECT_EQ(bm.num_free_blocks(), 6); + + sequence_group->finish_iteration(); + auto sequence_to_fork = sequence_group->get_running_sequences()[0]; + for (size_t i = 0; i < 4; ++i) { + const auto forked_sequence = sequence_group->fork_sequence(sequence_to_fork); + bm.fork_sequence(sequence_to_fork->get_id(), forked_sequence->get_id()); + } + sequence_group->schedule_tokens(1); + required_blocks = bm.required_blocks_count(sequence_group); + EXPECT_EQ(required_blocks, 4); + EXPECT_TRUE(bm.can_append_slots(sequence_group)); + bm.append_slots(sequence_group); + EXPECT_EQ(bm.num_free_blocks(), 2); + sequence_group->finish_iteration(); + + sequence_group->schedule_tokens(3); + required_blocks = bm.required_blocks_count(sequence_group); + EXPECT_EQ(required_blocks, 5); + EXPECT_FALSE(bm.can_append_slots(sequence_group)); +} \ No newline at end of file diff --git a/tests/python_tests/continuous_batching/test_preemption.py b/tests/python_tests/continuous_batching/test_preemption.py index ca7cb649aa..6f9e6ad254 100644 --- a/tests/python_tests/continuous_batching/test_preemption.py +++ b/tests/python_tests/continuous_batching/test_preemption.py @@ -5,16 +5,36 @@ from dataclasses import dataclass from typing import List +from openvino_genai.py_continuous_batching import GenerationConfig from common import get_model_and_tokenizer, save_ov_model_from_optimum, generate_and_compare_with_reference_text, \ DEFAULT_SCHEDULER_CONFIG, get_scheduler_config, run_test_pipeline, get_models_list, get_beam_search, get_greedy, \ get_multinomial_all_parameters, get_multinomial_temperature_and_num_return_sequence, \ get_multinomial_temperature_and_top_k, get_multinomial_temperature, get_multinomial_temperature_and_top_p from test_sampling import RandomSamplingTestStruct +def get_greedy_seq_len_300() -> GenerationConfig: + generation_config = GenerationConfig() + generation_config.num_return_sequences = 3 + generation_config.max_new_tokens = 300 + return generation_config + +def get_beam_search_seq_len_300() -> GenerationConfig: + generation_config = GenerationConfig() + generation_config.num_groups = 3 + generation_config.group_size = 2 + generation_config.max_new_tokens = 300 + generation_config.num_return_sequences = 3 + generation_config.num_return_sequences = generation_config.num_groups * generation_config.group_size + return generation_config + scheduler_params_list = [({"num_kv_blocks": 2, "block_size": 32, "dynamic_split_fuse": True, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_greedy()), ({"num_kv_blocks": 2, "block_size": 32, "dynamic_split_fuse": False, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_greedy()), + ({"num_kv_blocks": 10, "block_size": 32, "dynamic_split_fuse": True}, get_greedy_seq_len_300()), + ({"num_kv_blocks": 10, "block_size": 32, "dynamic_split_fuse": False}, get_greedy_seq_len_300()), ({"num_kv_blocks": 34, "block_size": 32, "dynamic_split_fuse": True, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_beam_search()), - ({"num_kv_blocks": 34, "block_size": 32, "dynamic_split_fuse": False, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_beam_search())] + ({"num_kv_blocks": 34, "block_size": 32, "dynamic_split_fuse": False, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_beam_search()), + ({"num_kv_blocks": 100, "block_size": 32, "dynamic_split_fuse": True}, get_beam_search_seq_len_300()), + ({"num_kv_blocks": 100, "block_size": 32, "dynamic_split_fuse": False}, get_beam_search_seq_len_300())] @pytest.mark.parametrize("params", scheduler_params_list) @pytest.mark.precommit def test_preemption(tmp_path, params): @@ -62,22 +82,21 @@ def test_preemption_with_multinomial(tmp_path, dynamic_split_fuse): ], ref_texts=[ [ - "\nI've seen this expression used too many times without making sense.\nAs an AI engineer, and as a scientist, we should all be looking" + "\nI've seen this expression used too many times without making sense.\nAs an AI engineer, and as a scientist, we should make everything easier" ], [ ' significance of 3862?\n3829\nWhat is the greatest common divisor of 15 and 7763?\n9\nCalculate the', - ' third derivative of 939*v**3*r**2 + 133*v**3*r**2 + v**3 - 77*', - " climate in the future? Do we have things to catch on fire, and if so does that mean we'll have a new climate before we have" + ' third derivative of 939*v**3*r**2 + 133*v**3*r**2 + v**3 - 16*', + " climate in the future? Do we have things to catch on fire, and if so does that mean we'll have a new climate change or is" ], [ - "\nIt's in the middle of nowhere if you haven’t seen one yet! It might be more convenient there than anywhere else 😊 we", - '\nUAE is a country with some great culture that has been living under Islamic oppression for almost 60 years now (including 20 years before) so no', - "\nI don't know anything. I'm not sure what kind this sub wants though... but apparently they are pretty bad at taking selfies too..", - '\nNope, just wanted to say how awesome and beautiful it was when my brother came back from an adventure trip across Asia - very much alive on' + "\nIt's in the middle of nowhere if you haven’t seen one yet! It might be more convenient there than anywhere else.. maybe take", + '\nUAE is a country with some great culture that has been living under Islamic oppression for almost 60 years now (including 20 years as part of Arab', + '\nNope, just wanted to say how awesome and beautiful it was when my brother came back from an adventure trip across Asia - our 2nd year', + '\nI don\'t know anything. I\'m not sure what kind this sub wants though... but apparently they are pretty bad at making videos/photos' ], ]) -@pytest.mark.skip(reason="should be fixed by support of n seqs in preemption") @pytest.mark.parametrize("dynamic_split_fuse", [True, False]) @pytest.mark.precommit def test_preemption_with_multinomial_n_seq(tmp_path, dynamic_split_fuse):