Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…genai into at/static-llm-pipeline-advanced-sampling
  • Loading branch information
TolyaTalamanov committed Dec 24, 2024
2 parents 55ead2d + db28c8c commit e01ccdf
Show file tree
Hide file tree
Showing 45 changed files with 706 additions and 891 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/llm_bench-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ jobs:
rm -rf ./ov_models/internvl2-1B
- name: WWB Tests
run: |
pip install git+https://github.com/huggingface/optimum-intel.git
pip install git+https://github.com/huggingface/optimum-intel.git@420fa87d039425a906b7f755e4562b65947f016a
GIT_CLONE_PROTECTION_ACTIVE=false PIP_PRE=1 PIP_EXTRA_INDEX_URL=https://storage.openvinotoolkit.org/simple/wheels/nightly pip install ${{ env.WWB_PATH }}
python -m pytest -v ${{ env.WWB_PATH }}/tests
stateful:
Expand Down Expand Up @@ -190,7 +190,7 @@ jobs:
- name: WWB Tests
run: |
pip install pytest
pip install git+https://github.com/huggingface/optimum-intel.git
pip install git+https://github.com/huggingface/optimum-intel.git@420fa87d039425a906b7f755e4562b65947f016a
GIT_CLONE_PROTECTION_ACTIVE=false PIP_PRE=1 PIP_EXTRA_INDEX_URL=https://storage.openvinotoolkit.org/simple/wheels/nightly pip install ${{ env.WWB_PATH }}
python -m pytest -v ${{ env.WWB_PATH }}/tests
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,14 @@ For more examples check out our [Generative AI workflow](https://docs.openvino.a
NOTE: Whisper Pipeline requires preprocessing of audio input (to adjust sampling rate and normalize)
### Converting and compressing image generation model from Hugging Face library
### Converting and quantizing speech-to-text model from Hugging Face library
```sh
#Download and convert to OpenVINO whisper-base model
optimum-cli export openvino --trust-remote-code --model openai/whisper-base whisper-base
#Download, convert and apply int8 static quantization to whisper-base model
optimum-cli export openvino --trust-remote-code --model openai/whisper-base \
--quant-mode int8 --dataset librispeech --num-samples 32 whisper-base-int8
```

### Run generation using Whisper Pipeline API in Python
Expand Down
2 changes: 1 addition & 1 deletion samples/export-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
--extra-index-url https://storage.openvinotoolkit.org/simple/wheels/pre-release
--extra-index-url https://storage.openvinotoolkit.org/simple/wheels/nightly
openvino-tokenizers~=2025.0.0.0.dev
optimum-intel @ git+https://github.com/huggingface/optimum-intel.git
optimum-intel @ git+https://github.com/huggingface/optimum-intel.git@420fa87d039425a906b7f755e4562b65947f016a
numpy<2.0.0; sys_platform == 'darwin'
einops==0.8.0 # For Qwen
transformers_stream_generator==0.0.5 # For Qwen
Expand Down
81 changes: 50 additions & 31 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl(
m_tokenizer = tokenizer;
m_generation_config = generation_config;
m_is_validation_mode_enabled = is_validation_mode_enabled;

ov::Core core;

auto [core_properties, compile_properties] = utils::split_core_compile_config(properties);
Expand Down Expand Up @@ -255,18 +255,6 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}
}, streamer);

OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
"Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding");

std::vector<GenerationHandle> generations;
for (size_t request_id = 0; request_id < input_ids.size(); ++request_id) {
OPENVINO_ASSERT(1 == input_ids[request_id].get_shape().at(0), "Use multiple tensors to pass a batch.");
generations.push_back(add_request(request_id, input_ids[request_id], sampling_params[request_id]));
}

std::vector<EncodedGenerationResult> results;
results.reserve(m_awaiting_requests.size());

auto drop_requests = [&] () {
for (const std::shared_ptr<ov::genai::SequenceGroup> request : m_requests) {
for (const auto& sequence: request->get_sequences()) {
Expand All @@ -279,25 +267,40 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
m_requests.clear();
};

OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && sampling_params[0].num_return_sequences == 1 &&
(sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
"Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding");

std::vector<GenerationHandle> generations;
for (size_t request_id = 0; request_id < input_ids.size(); ++request_id) {
OPENVINO_ASSERT(1 == input_ids[request_id].get_shape().at(0), "Use multiple tensors to pass a batch.");
generations.push_back(add_request(request_id, input_ids[request_id], sampling_params[request_id]));
}
auto all_requests = m_awaiting_requests; // we need to store all requests to get results from them once generation has finished

bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
try {
step();
} catch (...) {
drop_requests();
drop_requests(); // remove all requests from pipeline state in case of exception
throw;
}
if (streamer_ptr && generations.at(0)->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> token = generations.at(0).get()->back();

auto & generation = generations.at(0);
if (streamer_ptr && generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> token = generation->back();
for (const auto& gen_token : token.begin()->second.generated_ids) {
if (!streamer_ptr->put(gen_token)) {
continue_generation = !streamer_ptr->put(gen_token);
if (!continue_generation) {
generation->drop();
break;
}
}
}
}

if (streamer_ptr) {
if (streamer_ptr) { // push streamer's cache
streamer_ptr->end();
}

Expand All @@ -307,16 +310,32 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
OPENVINO_ASSERT(m_requests.empty(), "Internal error: current request is supposed to be dropped within step() function as completed");
}

for (size_t generation_idx = 0; generation_idx < generations.size(); ++generation_idx) {
const auto& generation = generations[generation_idx];
std::vector<EncodedGenerationResult> results;
results.reserve(all_requests.size());

for (size_t request_id = 0; request_id < all_requests.size(); ++request_id) {
const auto& request = all_requests[request_id];
auto sampling_params = request->get_sampling_parameters();
const auto& sequences = request->get_finished_sequences();
size_t num_outputs = std::min(sampling_params.num_return_sequences, sequences.size());

EncodedGenerationResult result;
result.m_request_id = 1;
std::vector<GenerationOutput> generation_outputs = generation->read_all();
for (const auto& generation_output : generation_outputs) {
result.m_generation_ids.push_back(std::move(generation_output.generated_ids));
result.m_scores.push_back(generation_output.score);
result.m_request_id = request_id;
result.m_generation_ids.resize(num_outputs);
result.m_scores.resize(num_outputs);

for (size_t i = 0; i < num_outputs; ++i) {
const auto & sequence = sequences[i];
const float score = sampling_params.is_beam_search() ? sequence->get_beam_search_score(sampling_params) : sequence->get_cumulative_log_probs();
const auto & generated_ids = sequence->get_generated_ids();

if (sampling_params.echo)
result.m_generation_ids[i] = request->get_prompt_ids();
std::copy(generated_ids.begin(), generated_ids.end(), std::back_inserter(result.m_generation_ids[i]));
result.m_scores[i] = score;
}
result.m_status = generation->get_status();

result.m_status = generations[request_id]->get_status();
results.push_back(std::move(result));
}

Expand Down Expand Up @@ -408,7 +427,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::_fill_prompt_log_probs(
for (size_t sequence_group_id = 0, currently_processed_tokens = 0; sequence_group_id < sequence_groups.size(); ++sequence_group_id) {
SequenceGroup::Ptr sequence_group = sequence_groups[sequence_group_id];
// requests not scheduled, in decoding phase or not echoing are not processed
if (!sequence_group->is_scheduled() || sequence_group->get_context_len() > sequence_group->get_prompt_len() ||
if (!sequence_group->is_scheduled() || sequence_group->get_context_len() > sequence_group->get_prompt_len() ||
!sequence_group->get_sampling_parameters().echo)
continue;

Expand All @@ -421,10 +440,10 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::_fill_prompt_log_probs(

size_t num_prompt_tokens_processed = sequence_group->get_num_processed_tokens();
OPENVINO_ASSERT(num_prompt_tokens_processed + actual_seq_len <= sequence_group->get_prompt_len());

// if we processed the whole prompt we don't include last logprob as it will be processed by the sampler (it's already completion)
// otherwise we include it as it will be used in the next part of the prompt
int exclude_last_logprob = 1;
// otherwise we include it as it will be used in the next part of the prompt
int exclude_last_logprob = 1;
if (num_prompt_tokens_processed + actual_seq_len < sequence_group->get_prompt_len())
exclude_last_logprob = 0;

Expand All @@ -435,7 +454,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::_fill_prompt_log_probs(
for (int token_logits_offset = 0, token_id_offset = num_prompt_tokens_processed + 1;
token_logits_offset < actual_seq_len - exclude_last_logprob;
token_logits_offset++, token_id_offset++) {

const float* token_logits = (sequence_group_logits_data + token_logits_offset * vocab_size);
int64_t token_id = sequence_group->get_prompt_ids()[token_id_offset];
float token_logit = token_logits[token_id];
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/generation_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ GenerationStatus GenerationHandleImpl::get_status() {
}

bool GenerationHandleImpl::can_read() {
return !is_dropped() && m_generation_stream->can_read();
return !is_dropped() && m_generation_stream->can_read();
}

bool GenerationHandleImpl::is_dropped() {
Expand Down
5 changes: 2 additions & 3 deletions src/cpp/src/generation_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ class GenerationStream {
GenerationStatus m_status = GenerationStatus::RUNNING;
SynchronizedQueue<GenerationOutputs> m_output_queue;

std::vector<uint64_t> last_sequence_ids;

public:
using Ptr = std::shared_ptr<GenerationStream>;

Expand All @@ -30,10 +28,11 @@ class GenerationStream {
m_output_queue.push(std::move(outputs));
}

// Retrieving vector of pairs <sequence_id, token_id> as we can generate multiple outputs for a single prompt
// Retrieving vector of pairs <sequence_id, token_ids> as we can generate multiple outputs for a single prompt
GenerationOutputs back() {
return m_output_queue.back();
}

GenerationOutputs read() {
return m_output_queue.pull();
}
Expand Down
Loading

0 comments on commit e01ccdf

Please sign in to comment.