Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial preemption for groups with multiple sequences #574

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions src/cpp/continuous_batching/src/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,78 @@ class BlockManager {
return m_block_table[seq_id];
}

const size_t free_rightest_blocks(SequenceGroup::Ptr sequence_group) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this (or some other) methods are not planned to be used as public API, let's move them to private section.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method.

size_t blocks_released = 0;
auto running_sequences = sequence_group->get_not_finished_sequences();
std::set<size_t> blocks_released_indices;
for (size_t idx = 0; idx < running_sequences.size(); ++idx) {
auto seq_id = running_sequences[idx]->get_id();
OPENVINO_ASSERT(m_block_table.count(seq_id) > 0, "Invalid sequence group.");
auto block_table = m_block_table[seq_id];
if (free_last_block(seq_id)) {
blocks_released++;
}
}
return blocks_released;
}

const bool free_group_partially_multiple_runnning_sequence(SequenceGroup::Ptr sequence_group, size_t num_required_blocks, size_t& phisical_blocks_released, size_t& logical_blocks_released) {
phisical_blocks_released = 0;
logical_blocks_released = 0;
while (num_required_blocks > phisical_blocks_released) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if case we need to preempt very long sequence, such loop can be expensive.

If it's possible, it would be great to compute a number of preempted blocks based on required number of blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified this method using formula blocks_to_remove_per_sequence == ceil(num_required_blocks / sequence_num).

size_t released_count = free_rightest_blocks(sequence_group);
logical_blocks_released += 1;
if (get_number_of_blocks_occupied_by_sequence(sequence_group) == 0) {
break;
}
phisical_blocks_released += released_count;
}
phisical_blocks_released = phisical_blocks_released;
return num_required_blocks <= phisical_blocks_released;
}

const bool free_group_partially_single_runnning_sequence(SequenceGroup::Ptr sequence_group, size_t num_required_blocks, size_t& phisical_blocks_released) {
auto sequences = sequence_group->get_not_finished_sequences();
OPENVINO_ASSERT(sequences.size() == 1);
auto running_sequence = sequences[0];
auto seq_id = running_sequence->get_id();
if (!has_block_table(seq_id)) {
// no blocks are allocated for this sequence, so it can't be preempted
return false;
}
auto block_table = get_block_table(seq_id);
auto prev_blocks_count = num_free_blocks();
free_sequence_partially_single_runnning_sequence(seq_id, num_required_blocks);

// calculate the number of released blocks
phisical_blocks_released = num_free_blocks() - prev_blocks_count;

return num_required_blocks <= phisical_blocks_released;
}

const size_t get_number_of_blocks_occupied_by_sequence(SequenceGroup::Ptr sequence_group) {
auto running_sequences = sequence_group->get_not_finished_sequences();
size_t num_blocks = 0;
std::set<size_t> indices;
for (size_t idx = 0; idx < running_sequences.size(); ++idx) {
auto seq_id = running_sequences[idx]->get_id();
if (m_block_table.count(seq_id) == 0) {
continue;
}
// OPENVINO_ASSERT(m_block_table.count(seq_id) > 0, "Invalid sequence group.");
auto block_table = m_block_table[seq_id];
size_t last_idx = block_table.back()->get_index();
if (indices.find(last_idx) != indices.end()) {
continue;
}
else {
indices.insert(last_idx);
num_blocks += block_table.size();
}
}
return num_blocks;
}

const bool has_block_table(uint64_t seq_id) {
return m_block_table.count(seq_id) > 0;
}
Expand Down Expand Up @@ -153,11 +225,23 @@ class BlockManager {
OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1);
}

void free_sequence_partially(size_t seq_id, size_t block_num) {
// currently this method is applicable only for groups with single sequences
// TODO: support for groups with multiple sequences
bool free_last_block(size_t seq_id) {
auto block_table = m_block_table[seq_id];
OPENVINO_ASSERT(block_table.size() >= 1);
size_t block_idx = m_block_table[seq_id].size() - 1;
m_allocator.free(block_table[block_idx]);
m_block_table[seq_id].resize(m_block_table[seq_id].size() - 1);

if (m_block_table[seq_id].size() == 0) {
OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1);
}
return block_table[block_idx]->is_free();
}

void free_sequence_partially_single_runnning_sequence(size_t seq_id, size_t block_num) {
// this method is applicable only for groups with single sequences

auto block_table = m_block_table[seq_id];
OPENVINO_ASSERT(block_table.size() >= block_num);
for (size_t idx = 0; idx < block_num; idx++) {
size_t block_idx = m_block_table[seq_id].size() - idx - 1;
Expand All @@ -166,7 +250,7 @@ class BlockManager {
}
m_block_table[seq_id].resize(m_block_table[seq_id].size() - block_num);

if (m_block_table.size() == 0) {
if (m_block_table[seq_id].size() == 0) {
OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1);
}
}
Expand Down Expand Up @@ -200,6 +284,7 @@ class BlockManager {
if (last_block_ids.find(last_block_id) != last_block_ids.end())
// this block was already processed
continue;
last_block_ids.insert(last_block_id);

size_t needed_blocks_per_sequence = seq_group->get_num_logical_blocks() - num_physical_blocks;

Expand Down
9 changes: 0 additions & 9 deletions src/cpp/continuous_batching/src/sampler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,15 +324,6 @@ SamplerOutput Sampler::sample(std::vector<SequenceGroup::Ptr> & sequence_groups,
if (m_beam_search_info.find(request_id) == m_beam_search_info.end()) {
m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group));
}
else {
// sequence group can be empty if returned after preemption
if (sequence_group->is_empty()) {
// clear beam search info
m_beam_search_info.erase(request_id);
m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group));
}
}


// current algorithm already adds new tokens to running sequences and
m_beam_search_info.at(request_id).select_next_tokens(sequence_group_logits, sampler_output);
Expand Down
53 changes: 23 additions & 30 deletions src/cpp/continuous_batching/src/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,52 +101,45 @@ class Scheduler {
size_t prev_blocks_count = m_block_manager.num_free_blocks();
size_t num_running_sequences = sequence_group->num_running_seqs();
size_t preempted_tokens = 0;
size_t num_blocks_occupied_by_sequence = m_block_manager.get_number_of_blocks_occupied_by_sequence(sequence_group);

if (num_running_sequences > 1) {
for (size_t s = 0; s < sequence_group->num_running_seqs(); ++s) {
auto seq_id = (*sequence_group)[s]->get_id();
if (num_blocks_occupied_by_sequence <= blocks_needed) {
auto sequences = sequence_group->get_not_finished_sequences();
for (size_t s = 0; s < sequences.size(); ++s) {
auto seq_id = sequences[s]->get_id();
m_block_manager.free_sequence(seq_id);
}
sequence_group->reset();
sequence_group->preempt_tokens(processed_tokens);
sequence_group->set_waiting();
return m_block_manager.num_free_blocks() > prev_blocks_count;
}

// currently partial preemtion is enabled only for single running sequence case
// TODO: implement partial preemption for case with muliple sequences in group
for (size_t s = 0; s < num_running_sequences; ++s) {
auto seq_id = (*sequence_group)[s]->get_id();
if (!m_block_manager.has_block_table(seq_id)) {
// no blocks are allocated for this sequence, so it can't be preempted
return false;
}
auto block_table = m_block_manager.get_block_table(seq_id);
size_t required_blocks = blocks_needed - total_num_released_blocks;
if (required_blocks >= block_table.size()) {
// fully drop a sequence(s) from block_manager
m_block_manager.free_sequence(seq_id);
}
else {
m_block_manager.free_sequence_partially(seq_id, required_blocks);
}

// calculate the number of released blocks
auto released_blocks = m_block_manager.num_free_blocks() - prev_blocks_count;
total_num_released_blocks += released_blocks;
prev_blocks_count = m_block_manager.num_free_blocks();

if (num_running_sequences > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to distinguish such cases? Looks like multiple sequences within a group is more generic case and should cover single sequence case as well.

Copy link
Contributor Author

@popovaan popovaan Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary to distinguish. But for single sequence case we can release blocks more efficiently than in general case using resize and not release blocks layer by layer:

m_block_table[seq_id].resize(m_block_table[seq_id].size() - block_num);

So it was distinguished only in terms of efficiency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed distinguishing between these cases in PR as discussed.

size_t phisycal_blocks_released;
size_t logical_blocks_released;
m_block_manager.free_group_partially_multiple_runnning_sequence(sequence_group, blocks_needed, phisycal_blocks_released, logical_blocks_released);

// calculate the number of preempted tokens
auto tokens_in_last_block = processed_tokens % block_size;
if (tokens_in_last_block == 0) {
tokens_in_last_block = block_size;
}
preempted_tokens = tokens_in_last_block + std::max<size_t>((int)logical_blocks_released - 1, 0) * block_size;

preempted_tokens += tokens_in_last_block + std::max<size_t>((int)released_blocks - 1, 0) * block_size;
if (m_block_manager.num_free_blocks() >= blocks_needed) {
break;
}
else {
OPENVINO_ASSERT(num_running_sequences == 1);
size_t phisycal_blocks_released;
m_block_manager.free_group_partially_single_runnning_sequence(sequence_group, blocks_needed, phisycal_blocks_released);

// calculate the number of preempted tokens
auto tokens_in_last_block = processed_tokens % block_size;
if (tokens_in_last_block == 0) {
tokens_in_last_block = block_size;
}
preempted_tokens = tokens_in_last_block + std::max<size_t>((int)phisycal_blocks_released - 1, 0) * block_size;
}

// case when preemption requires preempt prompt tokens
if (!m_config.dynamic_split_fuse && processed_tokens - preempted_tokens < sequence_group->get_prompt_len()) {
// preempt prompt fully to not leave partially generated prompt
Expand Down
29 changes: 11 additions & 18 deletions src/cpp/continuous_batching/src/sequence_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,17 @@ class SequenceGroup {
return running_seqs;
}

std::vector<Sequence::Ptr> get_not_finished_sequences() {
std::vector<Sequence::Ptr> running_seqs;
for (size_t seq_id = 0; seq_id < m_sequences.size(); ++seq_id) {
if (!m_sequences[seq_id]->has_finished()) {
running_seqs.emplace_back(m_sequences[seq_id]);
}
}

return running_seqs;
}

std::vector<Sequence::CPtr> get_running_sequences() const {
std::vector<Sequence::CPtr> running_seqs;
for (size_t seq_id = 0; seq_id < m_sequences.size(); ++seq_id) {
Expand Down Expand Up @@ -367,24 +378,6 @@ class SequenceGroup {
return m_sampling_params;
}

void reset() {
m_sequences.clear();
m_next_sequence_id = 0;
add_sequence(Sequence::create(m_next_sequence_id++));
clear_scheduled_tokens();
m_num_processed_tokens = 0;
m_max_content_len = 0;
}

bool is_empty() {
if (m_sequences.size() > 1)
return false;
OPENVINO_ASSERT(m_sequences.size() == 1);
if (m_sequences[0]->get_generated_len() > 0 || m_sequences[0]->get_cumulative_log_probs() != 0.0f)
return false;
return true;
}

void set_out_of_memory() {
for (size_t seq_id = 0; seq_id < m_sequences.size(); ++seq_id) {
if (m_sequences[seq_id]->is_running()) {
Expand Down
40 changes: 39 additions & 1 deletion src/cpp/continuous_batching/src/tests/block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ TEST(TestBlockManager, general_test) {
EXPECT_EQ(bm.get_block_table(0).size(), 6);
EXPECT_EQ(bm.num_free_blocks(), 0);

bm.free_sequence_partially(0, 4);
bm.free_sequence_partially_single_runnning_sequence(0, 4);
EXPECT_EQ(bm.get_block_table(0).size(), 2);
EXPECT_EQ(bm.num_free_blocks(), 4);

Expand All @@ -29,4 +29,42 @@ TEST(TestBlockManager, general_test) {
bm.fork_sequence(0, 1);
EXPECT_TRUE(bm.has_block_table(1));
EXPECT_EQ(bm.get_block_table(1).back()->get_references_count(), 2);

}

TEST(TestBlockManager, required_blocks_count) {
BlockManager bm = BlockManager(8);

std::vector<uint64_t> tokens = {0,1,2,3,4};
SequenceGroup::Ptr sequence_group = std::make_shared<SequenceGroup>(
0,
ov::Tensor(ov::element::i64, {
tokens.size()}, tokens.data()),
GenerationConfig::beam_search(),
4);
sequence_group->schedule_tokens(5);
auto required_blocks = bm.required_blocks_count(sequence_group);
EXPECT_EQ(required_blocks, 2);
EXPECT_TRUE(bm.can_append_slots(sequence_group));
bm.append_slots(sequence_group);
EXPECT_EQ(bm.num_free_blocks(), 6);

sequence_group->finish_iteration();
auto sequence_to_fork = sequence_group->get_running_sequences()[0];
for (size_t i = 0; i < 4; ++i) {
const auto forked_sequence = sequence_group->fork_sequence(sequence_to_fork);
bm.fork_sequence(sequence_to_fork->get_id(), forked_sequence->get_id());
}
sequence_group->schedule_tokens(1);
required_blocks = bm.required_blocks_count(sequence_group);
EXPECT_EQ(required_blocks, 4);
EXPECT_TRUE(bm.can_append_slots(sequence_group));
bm.append_slots(sequence_group);
EXPECT_EQ(bm.num_free_blocks(), 2);
sequence_group->finish_iteration();

sequence_group->schedule_tokens(3);
required_blocks = bm.required_blocks_count(sequence_group);
EXPECT_EQ(required_blocks, 5);
EXPECT_FALSE(bm.can_append_slots(sequence_group));
}
37 changes: 28 additions & 9 deletions tests/python_tests/continuous_batching/test_preemption.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,36 @@
from dataclasses import dataclass
from typing import List

from openvino_genai.py_continuous_batching import GenerationConfig
from common import get_model_and_tokenizer, save_ov_model_from_optimum, generate_and_compare_with_reference_text, \
DEFAULT_SCHEDULER_CONFIG, get_scheduler_config, run_test_pipeline, get_models_list, get_beam_search, get_greedy, \
get_multinomial_all_parameters, get_multinomial_temperature_and_num_return_sequence, \
get_multinomial_temperature_and_top_k, get_multinomial_temperature, get_multinomial_temperature_and_top_p
from test_sampling import RandomSamplingTestStruct

def get_greedy_seq_len_300() -> GenerationConfig:
generation_config = GenerationConfig()
generation_config.num_return_sequences = 3
generation_config.max_new_tokens = 300
return generation_config

def get_beam_search_seq_len_300() -> GenerationConfig:
generation_config = GenerationConfig()
generation_config.num_groups = 3
generation_config.group_size = 2
generation_config.max_new_tokens = 300
generation_config.num_return_sequences = 3
generation_config.num_return_sequences = generation_config.num_groups * generation_config.group_size
return generation_config

scheduler_params_list = [({"num_kv_blocks": 2, "block_size": 32, "dynamic_split_fuse": True, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_greedy()),
({"num_kv_blocks": 2, "block_size": 32, "dynamic_split_fuse": False, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_greedy()),
({"num_kv_blocks": 10, "block_size": 32, "dynamic_split_fuse": True}, get_greedy_seq_len_300()),
({"num_kv_blocks": 10, "block_size": 32, "dynamic_split_fuse": False}, get_greedy_seq_len_300()),
({"num_kv_blocks": 34, "block_size": 32, "dynamic_split_fuse": True, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_beam_search()),
({"num_kv_blocks": 34, "block_size": 32, "dynamic_split_fuse": False, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_beam_search())]
({"num_kv_blocks": 34, "block_size": 32, "dynamic_split_fuse": False, "max_num_batched_tokens": 256, "max_num_seqs": 256}, get_beam_search()),
({"num_kv_blocks": 100, "block_size": 32, "dynamic_split_fuse": True}, get_beam_search_seq_len_300()),
({"num_kv_blocks": 100, "block_size": 32, "dynamic_split_fuse": False}, get_beam_search_seq_len_300())]
@pytest.mark.parametrize("params", scheduler_params_list)
@pytest.mark.precommit
def test_preemption(tmp_path, params):
Expand Down Expand Up @@ -62,22 +82,21 @@ def test_preemption_with_multinomial(tmp_path, dynamic_split_fuse):
],
ref_texts=[
[
"\nI've seen this expression used too many times without making sense.\nAs an AI engineer, and as a scientist, we should all be looking"
"\nI've seen this expression used too many times without making sense.\nAs an AI engineer, and as a scientist, we should make everything easier"
],
[
' significance of 3862?\n3829\nWhat is the greatest common divisor of 15 and 7763?\n9\nCalculate the',
' third derivative of 939*v**3*r**2 + 133*v**3*r**2 + v**3 - 77*',
" climate in the future? Do we have things to catch on fire, and if so does that mean we'll have a new climate before we have"
' third derivative of 939*v**3*r**2 + 133*v**3*r**2 + v**3 - 16*',
" climate in the future? Do we have things to catch on fire, and if so does that mean we'll have a new climate change or is"
],
[
"\nIt's in the middle of nowhere if you haven’t seen one yet! It might be more convenient there than anywhere else 😊 we",
'\nUAE is a country with some great culture that has been living under Islamic oppression for almost 60 years now (including 20 years before) so no',
"\nI don't know anything. I'm not sure what kind this sub wants though... but apparently they are pretty bad at taking selfies too..",
'\nNope, just wanted to say how awesome and beautiful it was when my brother came back from an adventure trip across Asia - very much alive on'
"\nIt's in the middle of nowhere if you haven’t seen one yet! It might be more convenient there than anywhere else.. maybe take",
'\nUAE is a country with some great culture that has been living under Islamic oppression for almost 60 years now (including 20 years as part of Arab',
'\nNope, just wanted to say how awesome and beautiful it was when my brother came back from an adventure trip across Asia - our 2nd year',
'\nI don\'t know anything. I\'m not sure what kind this sub wants though... but apparently they are pretty bad at making videos/photos'
],
])

@pytest.mark.skip(reason="should be fixed by support of n seqs in preemption")
@pytest.mark.parametrize("dynamic_split_fuse", [True, False])
@pytest.mark.precommit
def test_preemption_with_multinomial_n_seq(tmp_path, dynamic_split_fuse):
Expand Down
Loading