Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clflush] Enable x86 cpu cache flush #5914

Merged
merged 8 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions python/tvm/autotvm/measure/measure_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,14 @@ class RPCRunner(Runner):
Whether check correctness after measurement. This will use llvm cpu target to
call your template and get the reference output.
This can work for TOPI templates, but may not work for your custom template.
enable_cpu_cache_flush: bool
Whether to enable cpu cache flush
"""
def __init__(self,
key, host, port, priority=1,
timeout=10, n_parallel=None,
number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1,
check_correctness=False):
check_correctness=False, enable_cpu_cache_flush=False):
super(RPCRunner, self).__init__(timeout, n_parallel)

self.key = key
Expand All @@ -200,6 +202,7 @@ def __init__(self,

self.ref_input = None
self.ref_output = None
self.enable_cpu_cache_flush = enable_cpu_cache_flush
self.check_correctness = check_correctness
self.cooldown_interval = cooldown_interval

Expand Down Expand Up @@ -267,7 +270,8 @@ def run(self, measure_inputs, build_results):
self.cooldown_interval,
remote_args,
self.ref_input,
self.ref_output)
self.ref_output,
self.enable_cpu_cache_flush)
futures.append(ret)

for future in futures:
Expand Down Expand Up @@ -309,7 +313,8 @@ class LocalRunner(RPCRunner):
Whether check correctness after measurement. This will use llvm cpu target to
call your template and get the reference output.
This can work for TOPI templates, but may not work for your custom template.

enable_cpu_cache_flush: bool
Whether to enable cpu cache flush
FrozenGene marked this conversation as resolved.
Show resolved Hide resolved
Note
----
This is a "fake" local mode. We start a silent rpc tracker and rpc server
Expand All @@ -318,13 +323,14 @@ class LocalRunner(RPCRunner):
def __init__(self,
timeout=10,
number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1,
check_correctness=False):
check_correctness=False, enable_cpu_cache_flush=False):
super(LocalRunner, self).__init__('', None, None, 0,
timeout=timeout, n_parallel=1,
number=number, repeat=repeat,
min_repeat_ms=min_repeat_ms,
cooldown_interval=cooldown_interval,
check_correctness=check_correctness)
check_correctness=check_correctness,
enable_cpu_cache_flush=enable_cpu_cache_flush)
self.tracker = None
self.server = None

Expand Down Expand Up @@ -421,7 +427,8 @@ def _wrapped(measure_input, tmp_dir, **kwargs):

def run_through_rpc(measure_input, build_result,
number, repeat, min_repeat_ms, cooldown_interval,
remote_args, ref_input=None, ref_output=None):
remote_args, ref_input=None, ref_output=None,
enable_cpu_cache_flush=False):
"""Run a generated library through rpc

Parameters
Expand Down Expand Up @@ -454,6 +461,8 @@ def run_through_rpc(measure_input, build_result,
The reference input used for checking correctness
ref_output: List of np.ndarray
The reference output used for checking correctness
enable_cpu_cache_flush: bool
Whether to enable cpu cache flush
FrozenGene marked this conversation as resolved.
Show resolved Hide resolved
"""
if isinstance(build_result, MeasureResult):
return build_result
Expand All @@ -473,8 +482,10 @@ def run_through_rpc(measure_input, build_result,
remote.upload(build_result.filename)
func = remote.load_module(os.path.split(build_result.filename)[1])
ctx = remote.context(str(measure_input.target), 0)
f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else ''
FrozenGene marked this conversation as resolved.
Show resolved Hide resolved
time_f = func.time_evaluator(
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms)
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms,
f_prepare=f_prepare)

# set input
if ref_input:
Expand Down
6 changes: 4 additions & 2 deletions python/tvm/runtime/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def save(self, file_name, fmt=""):
"""
_ffi_api.ModuleSaveToFile(self, file_name, fmt)

def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0):
def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0, f_prepare=''):
FrozenGene marked this conversation as resolved.
Show resolved Hide resolved
"""Get an evaluator that measures time cost of running function.

Parameters
Expand Down Expand Up @@ -192,6 +192,8 @@ def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0):
minimum duration requirement of one `repeat`.
i.e., When the run time of one `repeat` falls below this time, the `number` parameter
will be automatically increased.
f_prepare: str, optional
The prepared function name we want to execute before executing the time evaluator.

Note
----
Expand All @@ -207,7 +209,7 @@ def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0):
try:
feval = _ffi_api.RPCTimeEvaluator(
self, func_name, ctx.device_type, ctx.device_id,
number, repeat, min_repeat_ms)
number, repeat, min_repeat_ms, f_prepare)

def evaluator(*args):
"""Internal wrapped evaluator."""
Expand Down
81 changes: 71 additions & 10 deletions src/runtime/rpc/rpc_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

#include <cstring>
#include <memory>
#if defined(_M_X64) || defined(__x86_64__)
#include <immintrin.h>
#endif

#include "rpc_endpoint.h"
#include "rpc_session.h"
Expand Down Expand Up @@ -183,7 +186,7 @@ class RPCModuleNode final : public ModuleNode {
}

PackedFunc GetTimeEvaluator(const std::string& name, TVMContext ctx, int number, int repeat,
int min_repeat_ms) {
int min_repeat_ms, const std::string& f_prepare_name) {
InitRemoteFunc(&remote_get_time_evaluator_, "runtime.RPCTimeEvaluator");
// Remove session mask because we pass ctx by parts.
int dev_type = ctx.device_type;
Expand All @@ -194,11 +197,11 @@ class RPCModuleNode final : public ModuleNode {
if (module_handle_ != nullptr) {
return remote_get_time_evaluator_(GetRef<Module>(this), name,
static_cast<int>(ctx.device_type), ctx.device_id, number,
repeat, min_repeat_ms);
repeat, min_repeat_ms, f_prepare_name);
} else {
return remote_get_time_evaluator_(Optional<Module>(nullptr), name,
static_cast<int>(ctx.device_type), ctx.device_id, number,
repeat, min_repeat_ms);
repeat, min_repeat_ms, f_prepare_name);
}
}

Expand Down Expand Up @@ -236,7 +239,7 @@ class RPCModuleNode final : public ModuleNode {
// The local channel
std::shared_ptr<RPCSession> sess_;
// remote function to get time evaluator
TypedPackedFunc<PackedFunc(Optional<Module>, std::string, int, int, int, int, int)>
TypedPackedFunc<PackedFunc(Optional<Module>, std::string, int, int, int, int, int, std::string)>
remote_get_time_evaluator_;
// remote function getter for modules.
TypedPackedFunc<PackedFunc(Module, std::string, bool)> remote_mod_get_function_;
Expand Down Expand Up @@ -300,8 +303,43 @@ std::shared_ptr<RPCSession> RPCModuleGetSession(Module mod) {
return rmod->sess();
}

/*!
* \brief Flush the cache.
* \param addr The address of data we want to flush
* \param len The length of data
*/
/*
* When we are in the tuning of TVM, we will make TVM occupy
* the cache fully and doesn't flush it during iteration.
* This has problems then in e2e testing, since arrays that
* we assume exist in cache (ie. weights) are evicted during e2e runs,
* which leads to lower performance.
*/
inline void CPUCacheFlushImpl(const char* addr, unsigned int len) {
// TODO(FrozenGene): Support ARM.
#if (defined(_M_X64) || defined(__x86_64__))
const size_t cache_line = 64;
if (addr == nullptr || len <= 0) {
return;
}

for (uintptr_t uptr = (uintptr_t)addr & ~(cache_line - 1); uptr < (uintptr_t)addr + len;
uptr += cache_line) {
_mm_clflush(reinterpret_cast<const void*>(uptr));
}

#endif
}

inline void CPUCacheFlush(int begin_index, const TVMArgs& args) {
for (int i = begin_index; i < args.size(); i++) {
CPUCacheFlushImpl(static_cast<char*>((args[i].operator DLTensor*()->data)),
GetDataSize(*(args[i].operator DLTensor*())));
}
}

PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repeat,
int min_repeat_ms) {
int min_repeat_ms, PackedFunc f_prepare) {
CHECK(pf != nullptr);

if (static_cast<int>(ctx.device_type) == static_cast<int>(kDLMicroDev)) {
Expand All @@ -310,7 +348,8 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe
return (*get_micro_time_evaluator)(pf, ctx, number, repeat);
}

auto ftimer = [pf, ctx, number, repeat, min_repeat_ms](TVMArgs args, TVMRetValue* rv) mutable {
auto ftimer = [pf, ctx, number, repeat, min_repeat_ms, f_prepare](TVMArgs args,
TVMRetValue* rv) mutable {
TVMRetValue temp;
std::ostringstream os;
// skip first time call, to activate lazy compilation components.
Expand All @@ -319,6 +358,9 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe
DeviceAPI::Get(ctx)->StreamSync(ctx, nullptr);

for (int i = 0; i < repeat; ++i) {
if (f_prepare != nullptr) {
f_prepare.CallPacked(args, &temp);
}
std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::nanoseconds> tbegin,
tend;
double duration_ms = 0.0;
Expand Down Expand Up @@ -358,7 +400,7 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe

TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator")
.set_body_typed([](Optional<Module> opt_mod, std::string name, int device_type, int device_id,
int number, int repeat, int min_repeat_ms) {
int number, int repeat, int min_repeat_ms, std::string f_prepare_name) {
TVMContext ctx;
ctx.device_type = static_cast<DLDeviceType>(device_type);
ctx.device_id = device_id;
Expand All @@ -367,17 +409,36 @@ TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator")
std::string tkey = m->type_key();
if (tkey == "rpc") {
return static_cast<RPCModuleNode*>(m.operator->())
->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms);
->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms, f_prepare_name);
} else {
return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms);
PackedFunc f_prepare;
if (!f_prepare_name.empty()) {
auto* pf_prepare = runtime::Registry::Get(f_prepare_name);
CHECK(pf_prepare != nullptr)
<< "Cannot find " << f_prepare_name << " in the global function";
f_prepare = *pf_prepare;
}
return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms,
f_prepare);
}
} else {
auto* pf = runtime::Registry::Get(name);
CHECK(pf != nullptr) << "Cannot find " << name << " in the global function";
return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms);
PackedFunc f_prepare;
if (!f_prepare_name.empty()) {
auto* pf_prepare = runtime::Registry::Get(f_prepare_name);
CHECK(pf_prepare != nullptr)
<< "Cannot find " << f_prepare_name << " in the global function";
f_prepare = *pf_prepare;
}
return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms, f_prepare);
}
});

TVM_REGISTER_GLOBAL("cache_flush_cpu_non_first_arg").set_body([](TVMArgs args, TVMRetValue* rv) {
CPUCacheFlush(1, args);
});

// server function registration.
TVM_REGISTER_GLOBAL("tvm.rpc.server.ImportModule").set_body_typed([](Module parent, Module child) {
parent->Import(child);
Expand Down
3 changes: 2 additions & 1 deletion src/runtime/rpc/rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,11 @@ struct RemoteSpace {
* minimum duration requirement of one `repeat`.
* i.e., When the run time of one `repeat` falls below this time,
* the `number` parameter will be automatically increased.
* \param f_prepare The function to be executed before we excetute time evaluator.
* \return f_timer A timer function.
*/
PackedFunc WrapTimeEvaluator(PackedFunc f, TVMContext ctx, int number, int repeat,
int min_repeat_ms);
int min_repeat_ms, PackedFunc f_prepare = nullptr);

/*!
* \brief Create a Global RPC module that refers to the session.
Expand Down
5 changes: 3 additions & 2 deletions tutorials/autotvm/tune_relay_x86.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ def get_network(name, batch_size):

'measure_option': autotvm.measure_option(
builder=autotvm.LocalBuilder(),
runner=autotvm.LocalRunner(number=10, repeat=1,
min_repeat_ms=1000),
runner=autotvm.LocalRunner(number=1, repeat=10,
min_repeat_ms=0,
enable_cpu_cache_flush=True),
FrozenGene marked this conversation as resolved.
Show resolved Hide resolved
),
}

Expand Down