Skip to content

Commit

Permalink
Merge pull request openvinotoolkit#8 from dmatveev/dm/npuw_fix_fcfa
Browse files Browse the repository at this point in the history
[MIRROR] NPUW: Various fixes for FUNCALL_FOR_ALL
  • Loading branch information
dmatveev authored Oct 9, 2024
2 parents 5f69062 + 86843ef commit 0355114
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/plugins/intel_npu/src/plugin/npuw/compiled_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ ov::npuw::CompiledModel::CompiledModel(const std::shared_ptr<ov::Model>& model,
m_compiled_submodels[id].closure = subgraph._closure;
m_compiled_submodels[id].scales = subgraph._scales;
m_compiled_submodels[id].zerops = subgraph._zerops;
m_compiled_submodels[id].forced_to_fcall = subgraph._forced_to_fcall;
m_compiled_submodels[id].update_required.resize(subgraph._closure.size(), false);
fill_weights_bank(id);
} // if(!funcall)
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/intel_npu/src/plugin/npuw/compiled_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class CompiledModel : public ov::ICompiledModel {
std::vector<ov::Tensor> zerops;
std::vector<bool> update_required;

bool forced_to_fcall = false;

// FIXME: Take it out of structure
ov::SoPtr<ov::ICompiledModel> ref_compiled_model;
bool switched_to_ref = false;
Expand Down
34 changes: 22 additions & 12 deletions src/plugins/intel_npu/src/plugin/npuw/just_sync_infer_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ ov::npuw::JustInferRequest::JustInferRequest(const std::shared_ptr<ov::npuw::Com
if (comp_model_desc.replaced_by) {
// Pre-allocate output tesnors for this function call
const auto real_idx = comp_model_desc.replaced_by.value();
auto& proto_comp_model = m_npuw_model->m_compiled_submodels[real_idx].compiled_model;
auto& proto_comp_model_desc = m_npuw_model->m_compiled_submodels[real_idx];
auto& proto_comp_model = proto_comp_model_desc.compiled_model;
for (size_t out_idx = 0; out_idx < proto_comp_model->outputs().size(); out_idx++) {
const auto& port = proto_comp_model->outputs()[out_idx];
m_funcall_result[LinkFrom{i, out_idx}] =
Expand All @@ -63,12 +64,13 @@ ov::npuw::JustInferRequest::JustInferRequest(const std::shared_ptr<ov::npuw::Com
} // if(replaced_by)

// Special cases are handled -- so nothing to do here
const bool is_piped = is_pipelined(i);
bool recompiled = false;
auto rqs = create_infer_requests(i, m_use_function_pipelining ? 2 : 1, &recompiled);
auto rqs = create_infer_requests(i, is_piped ? 2 : 1, &recompiled);
failover_happened |= recompiled;
m_subrequests[i] = rqs.at(0);
m_subrequest_devices[i] = *comp_model_desc.device_it;
if (comp_model_desc.replaced_by && m_use_function_pipelining) {
if (is_piped) {
m_funcall_pipeline[i].subrequest = rqs.at(1);
}

Expand All @@ -89,6 +91,10 @@ ov::npuw::JustInferRequest::JustInferRequest(const std::shared_ptr<ov::npuw::Com
for (std::size_t i = 0; i < m_num_submodels; i++) {
auto& comp_model_desc = m_npuw_model->m_compiled_submodels[i];
if (comp_model_desc.replaced_by) { // a function call..
if (!is_pipelined(i)) {
LOG_INFO("Skip subgraph[" << i << "] as it is a single-call function");
continue;
}
// Use real_id to accumulate information about
// different functions
const auto real_id = comp_model_desc.replaced_by.value();
Expand Down Expand Up @@ -331,8 +337,6 @@ void ov::npuw::JustInferRequest::bind_global_parameters(std::size_t idx) {
auto& comp_model_desc = m_npuw_model->m_compiled_submodels[idx];
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);

LOG_DEBUG("Real idx is..." << real_idx);

const bool do_copy = needs_copy(idx);
const auto& iodesc = m_subrequests_gio.at(idx);

Expand All @@ -341,7 +345,7 @@ void ov::npuw::JustInferRequest::bind_global_parameters(std::size_t idx) {

// pick which subrequest we actually work on here
auto subr = [&]() {
if (now_idx() && real_idx == real(now_idx().value()) && m_use_function_pipelining) {
if (now_idx() && real_idx == real(now_idx().value()) && is_pipelined(now_idx().value())) {
LOG_DEBUG("Accessing the pipeline subrequest");
// The real index of request we need to prepare IS
// the same request which executes now AND
Expand Down Expand Up @@ -458,7 +462,7 @@ void ov::npuw::JustInferRequest::function_prologue(std::size_t idx) {
// 2. Unpack the function closure -- right here, if pipelining if not enabled.
// If it is enabled, the flow is a little bit different - see run_subrequest_for_success()
// for details.
if (!m_use_function_pipelining) {
if (!is_pipelined(idx)) {
LOG_DEBUG("Unpacking closures...");
LOG_BLOCK();
unpack_closure(idx, m_subrequests[real_idx]);
Expand Down Expand Up @@ -555,15 +559,16 @@ void ov::npuw::JustInferRequest::recreate_subrequests(std::size_t idx) {
auto& comp_model_desc = m_npuw_model->m_compiled_submodels[idx];
auto real_idx = comp_model_desc.replaced_by.value_or(idx);

auto new_rqs = create_infer_requests(idx, m_use_function_pipelining ? 2 : 1);
const auto is_piped = is_pipelined(idx);
auto new_rqs = create_infer_requests(idx, is_piped ? 2 : 1);

// NB: Regardless if this subrequest was a function call
// or not, always use the real_idx here - for regular
// subrequests, real_id == idx, but for function calls it
// is critical here to update the function body, not the
// function calls (which are left empty now in the vector)
m_subrequests[real_idx] = new_rqs.at(0);
if (comp_model_desc.replaced_by && m_use_function_pipelining) {
if (is_piped) {
m_funcall_pipeline[real_idx].subrequest = new_rqs.at(1);
}
// After an infer request is recreated, the internal cross-request
Expand Down Expand Up @@ -637,7 +642,7 @@ void ov::npuw::JustInferRequest::run_subrequest_for_success(std::size_t idx, boo

if (job_done) {
dump_output_tensors(idx); // FIXME: Called here unconditionally, need to refactor
if (m_use_function_pipelining && m_funcall_pipeline[idx].next) {
if (is_pipelined(idx) && m_funcall_pipeline[idx].next) {
// Swap the next (pipelined, semi-prepared) infer request in the chain
// with the default (to be accessed next) one.
std::swap(m_subrequests[real_idx], m_funcall_pipeline[real_idx].subrequest);
Expand Down Expand Up @@ -666,7 +671,7 @@ void ov::npuw::JustInferRequest::unsafe_run_this_prep_next(std::size_t idx, bool
// The next subgraph is a call to the same function...
// At this point, THIS infer request is already prepared.
// Run it, then prepare it again for the next entrace
if (m_use_function_pipelining) {
if (is_pipelined(real_idx)) {
// function pipelining is here! and the next rq is ours.
NPUW_ASSERT(m_funcall_pipeline[idx].next.value() == next_idx);
during(this_subr, [&]() {
Expand Down Expand Up @@ -697,7 +702,7 @@ void ov::npuw::JustInferRequest::unsafe_run_this_prep_next(std::size_t idx, bool
bind_global_parameters(next_idx);
next_prepared = true;
}
if (m_use_function_pipelining && m_funcall_pipeline[idx].next) {
if (is_pipelined(idx) && m_funcall_pipeline[idx].next) {
const auto my_next_idx = m_funcall_pipeline[idx].next.value();
LOG_DEBUG("Unpacking closures for the NEXT subrequest[" << my_next_idx << "]...");
LOG_BLOCK();
Expand Down Expand Up @@ -745,3 +750,8 @@ bool ov::npuw::JustInferRequest::supports_async_pipeline() const {
void ov::npuw::JustInferRequest::update_subrequest_links(std::size_t) {
connect_subrequests();
}

bool ov::npuw::JustInferRequest::is_pipelined(std::size_t idx) const {
const auto& desc = m_npuw_model->m_compiled_submodels[real(idx)];
return m_use_function_pipelining && desc.replaced_by && !desc.forced_to_fcall;
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class JustInferRequest final : public IBaseInferRequest {
using TensorPtr = ov::SoPtr<ov::ITensor>;
std::map<LinkFrom, TensorPtr> m_funcall_result;

bool is_pipelined(std::size_t idx) const;
bool m_use_function_pipelining = false;
struct FuncallPipeline {
// A "brother" subrequest for a "primary" subrequest. Initialized only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ void Partitioner::identifySubgraphs() {
group_nodes.insert(it->second);
}
group.sg._repeated_id = group.repeated_id;
group.sg._forced_to_fcall = group.forced_to_fcall;
group.sg._gflops = group.gflops;
group.sg._ops = group.all_layers.size();
P.total_ops += group.sg._ops;
Expand Down Expand Up @@ -1450,6 +1451,7 @@ void Partitioner::createFunction(FunctionPipeline& func_ggg) {
funcall._gflops = body_sg._gflops; // preserving this is required for proper stats
funcall._ops = body_sg._ops; // preserving this is requried for proper stats
funcall._avoid_list = body_sg._avoid_list;
funcall._forced_to_fcall = body_sg._forced_to_fcall;

// Declare a new function AND record a function call
ov::npuw::Function function;
Expand Down Expand Up @@ -1549,6 +1551,7 @@ void Partitioner::matchRepeatedSubgraphs(const std::string& func_name) {
funcall._gflops = this_sg._gflops; // duplicated code again!
funcall._ops = this_sg._ops; // duplicated code again!
funcall._avoid_list = this_sg._avoid_list; // duplicated code again!
funcall._forced_to_fcall = this_sg._forced_to_fcall;
rearrange_to_function_protocol(this_sg, body_params, funcall._parameters, func_ggg.param_call_to_proto);
rearrange_to_function_protocol(this_sg, body_results, funcall._results, func_ggg.result_call_to_proto);

Expand Down Expand Up @@ -1882,7 +1885,7 @@ void Partitioner::decompressionCutOff(const std::string& func_name) {
}

// Finally, remove the function body's parameters here
ov::npuw::patterns::finalize_remap(f, closure_remap);
ov::npuw::patterns::finalize_remap(f, func_group.refs.front(), closure_remap);
} // if (CAST_SCALE && have(params_to_scale))
}
LOG_DEBUG("Function model inputs after the DCOFF:");
Expand Down Expand Up @@ -2017,6 +2020,8 @@ ov::npuw::Partitioning ov::npuw::getPartitioning(const std::shared_ptr<ov::Model
LOG_INFO("Turning block " << gid << " into a function " << this_group.repeated_id << "...");
LOG_BLOCK();
this_group.repeated_id = std::move(new_id);
this_group.forced_to_fcall = true;

ov::npuw::RepeatedBlock this_block;
for (const auto& layer : this_group.all_layers) {
// Note: NOT move(layer)! It breaks the code here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ struct Subgraph {
//
// FIXME: Replace with variant or some other proper way (maybe
// even a class hierarchy)
std::string _repeated_id;
std::string _funcall;
std::string _repeated_id; // FIXME: What's the difference
std::string _funcall; // ..between these two?
std::vector<ov::Tensor> _closure;
std::vector<ov::Tensor> _scales; // Scale coeffs for manual unpacking
std::vector<ov::Tensor> _zerops; // Zero points for manual unpacking

bool _forced_to_fcall = false;

struct Gather {
// NB.: int64_t is strange but it is used by OV to refer to parameters
int64_t dst_idx = -1;
Expand Down Expand Up @@ -72,6 +74,11 @@ struct Group {

std::string avoid_list;

// Set to true if the Group was forcibly turned to functon. Such
// function has just a single associated funcall and are subjects
// to some optimizations (simplifications).
bool forced_to_fcall = false;

ov::npuw::Subgraph sg;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,40 @@ void apply_remap(Subgraph& fcall, const ClosureRemap& m) {
fcall._zerops = std::move(new_zerops);
}

void finalize_remap(Function& fbody, const ClosureRemap& m) {
void finalize_remap(Function& fbody, Subgraph& fsg, const ClosureRemap& m) {
LOG_DEBUG("Removing retired parameters...");
LOG_BLOCK();

// Unfortunate truth - this function has to be aware of the
// Host Gather existence to properly update indices after
// Remap.
using PPtr = std::shared_ptr<ov::op::v0::Parameter>;
struct GatherParams {
PPtr pidx; // Parameter @ function body - input_ids
PPtr psrc; // Parameter @ function body - vocab tensor
PPtr pdst; // Parameter @ function body - gathered ids
};
GatherParams gather_params;
const auto& params = fbody._model->get_parameters();
if (fsg._host_gather.dst_idx != -1) {
gather_params = GatherParams{params[fsg._host_gather.idx_idx],
params[fsg._host_gather.src_idx],
params[fsg._host_gather.dst_idx]};
}

for (auto&& p : m.params_to_remove) {
LOG_DEBUG("Removing parameter " << p);
LOG_BLOCK();
fbody._model->remove_parameter(p);
}

// Update indices for gather
if (fsg._host_gather.dst_idx != -1) {
fsg._host_gather.idx_idx = fbody._model->get_parameter_index(gather_params.pidx);
fsg._host_gather.src_idx = fbody._model->get_parameter_index(gather_params.psrc);
fsg._host_gather.dst_idx = fbody._model->get_parameter_index(gather_params.pdst);
}

fbody._model->validate_nodes_and_infer_types();
LOG_DEBUG("DONE");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct ClosureRemap {

ClosureRemap build_remap(const Function& fbody, const DCOFFParams& p);
void apply_remap(Subgraph& fcall, const ClosureRemap& m);
void finalize_remap(Function& fbody, const ClosureRemap& m);
void finalize_remap(Function& fbody, Subgraph& fsg, const ClosureRemap& m);

// Various patterns here

Expand Down

0 comments on commit 0355114

Please sign in to comment.