diff --git a/python/tvm/autotvm/measure/measure_methods.py b/python/tvm/autotvm/measure/measure_methods.py index b8969f55c00a..03fc09a2538e 100644 --- a/python/tvm/autotvm/measure/measure_methods.py +++ b/python/tvm/autotvm/measure/measure_methods.py @@ -180,12 +180,18 @@ class RPCRunner(Runner): Whether check correctness after measurement. This will use llvm cpu target to call your template and get the reference output. This can work for TOPI templates, but may not work for your custom template. + enable_cpu_cache_flush: bool + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. """ def __init__(self, key, host, port, priority=1, timeout=10, n_parallel=None, number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1, - check_correctness=False): + check_correctness=False, enable_cpu_cache_flush=False): super(RPCRunner, self).__init__(timeout, n_parallel) self.key = key @@ -200,6 +206,7 @@ def __init__(self, self.ref_input = None self.ref_output = None + self.enable_cpu_cache_flush = enable_cpu_cache_flush self.check_correctness = check_correctness self.cooldown_interval = cooldown_interval @@ -267,7 +274,8 @@ def run(self, measure_inputs, build_results): self.cooldown_interval, remote_args, self.ref_input, - self.ref_output) + self.ref_output, + self.enable_cpu_cache_flush) futures.append(ret) for future in futures: @@ -309,7 +317,12 @@ class LocalRunner(RPCRunner): Whether check correctness after measurement. This will use llvm cpu target to call your template and get the reference output. This can work for TOPI templates, but may not work for your custom template. - + enable_cpu_cache_flush: bool + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. Note ---- This is a "fake" local mode. We start a silent rpc tracker and rpc server @@ -318,13 +331,14 @@ class LocalRunner(RPCRunner): def __init__(self, timeout=10, number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1, - check_correctness=False): + check_correctness=False, enable_cpu_cache_flush=False): super(LocalRunner, self).__init__('', None, None, 0, timeout=timeout, n_parallel=1, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms, cooldown_interval=cooldown_interval, - check_correctness=check_correctness) + check_correctness=check_correctness, + enable_cpu_cache_flush=enable_cpu_cache_flush) self.tracker = None self.server = None @@ -421,7 +435,8 @@ def _wrapped(measure_input, tmp_dir, **kwargs): def run_through_rpc(measure_input, build_result, number, repeat, min_repeat_ms, cooldown_interval, - remote_args, ref_input=None, ref_output=None): + remote_args, ref_input=None, ref_output=None, + enable_cpu_cache_flush=False): """Run a generated library through rpc Parameters @@ -454,6 +469,12 @@ def run_through_rpc(measure_input, build_result, The reference input used for checking correctness ref_output: List of np.ndarray The reference output used for checking correctness + enable_cpu_cache_flush: bool + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. """ if isinstance(build_result, MeasureResult): return build_result @@ -473,8 +494,16 @@ def run_through_rpc(measure_input, build_result, remote.upload(build_result.filename) func = remote.load_module(os.path.split(build_result.filename)[1]) ctx = remote.context(str(measure_input.target), 0) + + # Limitation: + # We can not get PackFunction directly in the remote mode as it is wrapped + # under the std::function. We could lift the restriction later once we fold + # the PackedFunc as an object. Currently, we pass function name to work + # around it. + f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else '' time_f = func.time_evaluator( - func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms) + func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms, + f_preproc=f_prepare) # set input if ref_input: diff --git a/python/tvm/runtime/module.py b/python/tvm/runtime/module.py index 3cdb28f8c496..754bb6f7dcef 100644 --- a/python/tvm/runtime/module.py +++ b/python/tvm/runtime/module.py @@ -163,7 +163,7 @@ def save(self, file_name, fmt=""): """ _ffi_api.ModuleSaveToFile(self, file_name, fmt) - def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0): + def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0, f_preproc=''): """Get an evaluator that measures time cost of running function. Parameters @@ -192,6 +192,8 @@ def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0): minimum duration requirement of one `repeat`. i.e., When the run time of one `repeat` falls below this time, the `number` parameter will be automatically increased. + f_preproc: str, optional + The preprocess function name we want to execute before executing the time evaluator. Note ---- @@ -207,7 +209,7 @@ def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0): try: feval = _ffi_api.RPCTimeEvaluator( self, func_name, ctx.device_type, ctx.device_id, - number, repeat, min_repeat_ms) + number, repeat, min_repeat_ms, f_preproc) def evaluator(*args): """Internal wrapped evaluator.""" diff --git a/src/runtime/rpc/rpc_module.cc b/src/runtime/rpc/rpc_module.cc index 89f3e7c6c7f8..d1eb89164fb7 100644 --- a/src/runtime/rpc/rpc_module.cc +++ b/src/runtime/rpc/rpc_module.cc @@ -26,6 +26,9 @@ #include #include +#if defined(_M_X64) || defined(__x86_64__) +#include +#endif #include "rpc_endpoint.h" #include "rpc_session.h" @@ -183,7 +186,7 @@ class RPCModuleNode final : public ModuleNode { } PackedFunc GetTimeEvaluator(const std::string& name, TVMContext ctx, int number, int repeat, - int min_repeat_ms) { + int min_repeat_ms, const std::string& f_preproc_name) { InitRemoteFunc(&remote_get_time_evaluator_, "runtime.RPCTimeEvaluator"); // Remove session mask because we pass ctx by parts. int dev_type = ctx.device_type; @@ -194,11 +197,11 @@ class RPCModuleNode final : public ModuleNode { if (module_handle_ != nullptr) { return remote_get_time_evaluator_(GetRef(this), name, static_cast(ctx.device_type), ctx.device_id, number, - repeat, min_repeat_ms); + repeat, min_repeat_ms, f_preproc_name); } else { return remote_get_time_evaluator_(Optional(nullptr), name, static_cast(ctx.device_type), ctx.device_id, number, - repeat, min_repeat_ms); + repeat, min_repeat_ms, f_preproc_name); } } @@ -236,7 +239,7 @@ class RPCModuleNode final : public ModuleNode { // The local channel std::shared_ptr sess_; // remote function to get time evaluator - TypedPackedFunc, std::string, int, int, int, int, int)> + TypedPackedFunc, std::string, int, int, int, int, int, std::string)> remote_get_time_evaluator_; // remote function getter for modules. TypedPackedFunc remote_mod_get_function_; @@ -300,8 +303,43 @@ std::shared_ptr RPCModuleGetSession(Module mod) { return rmod->sess(); } +/*! + * \brief Flush the cache. + * \param addr The address of data we want to flush + * \param len The length of data + */ +/* + * When we are in the tuning of TVM, we will make TVM occupy + * the cache fully and doesn't flush it during iteration. + * This has problems then in e2e testing, since arrays that + * we assume exist in cache (ie. weights) are evicted during e2e runs, + * which leads to lower performance. + */ +inline void CPUCacheFlushImpl(const char* addr, unsigned int len) { +// TODO(FrozenGene): Support ARM. +#if (defined(_M_X64) || defined(__x86_64__)) + const size_t cache_line = 64; + if (addr == nullptr || len <= 0) { + return; + } + + for (uintptr_t uptr = (uintptr_t)addr & ~(cache_line - 1); uptr < (uintptr_t)addr + len; + uptr += cache_line) { + _mm_clflush(reinterpret_cast(uptr)); + } + +#endif +} + +inline void CPUCacheFlush(int begin_index, const TVMArgs& args) { + for (int i = begin_index; i < args.size(); i++) { + CPUCacheFlushImpl(static_cast((args[i].operator DLTensor*()->data)), + GetDataSize(*(args[i].operator DLTensor*()))); + } +} + PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repeat, - int min_repeat_ms) { + int min_repeat_ms, PackedFunc f_preproc) { CHECK(pf != nullptr); if (static_cast(ctx.device_type) == static_cast(kDLMicroDev)) { @@ -310,7 +348,8 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe return (*get_micro_time_evaluator)(pf, ctx, number, repeat); } - auto ftimer = [pf, ctx, number, repeat, min_repeat_ms](TVMArgs args, TVMRetValue* rv) mutable { + auto ftimer = [pf, ctx, number, repeat, min_repeat_ms, f_preproc](TVMArgs args, + TVMRetValue* rv) mutable { TVMRetValue temp; std::ostringstream os; // skip first time call, to activate lazy compilation components. @@ -319,6 +358,9 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe DeviceAPI::Get(ctx)->StreamSync(ctx, nullptr); for (int i = 0; i < repeat; ++i) { + if (f_preproc != nullptr) { + f_preproc.CallPacked(args, &temp); + } std::chrono::time_point tbegin, tend; double duration_ms = 0.0; @@ -358,7 +400,7 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator") .set_body_typed([](Optional opt_mod, std::string name, int device_type, int device_id, - int number, int repeat, int min_repeat_ms) { + int number, int repeat, int min_repeat_ms, std::string f_preproc_name) { TVMContext ctx; ctx.device_type = static_cast(device_type); ctx.device_id = device_id; @@ -367,17 +409,36 @@ TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator") std::string tkey = m->type_key(); if (tkey == "rpc") { return static_cast(m.operator->()) - ->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms); + ->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms, f_preproc_name); } else { - return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms); + PackedFunc f_preproc; + if (!f_preproc_name.empty()) { + auto* pf_preproc = runtime::Registry::Get(f_preproc_name); + CHECK(pf_preproc != nullptr) + << "Cannot find " << f_preproc_name << " in the global function"; + f_preproc = *pf_preproc; + } + return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms, + f_preproc); } } else { auto* pf = runtime::Registry::Get(name); CHECK(pf != nullptr) << "Cannot find " << name << " in the global function"; - return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms); + PackedFunc f_preproc; + if (!f_preproc_name.empty()) { + auto* pf_preproc = runtime::Registry::Get(f_preproc_name); + CHECK(pf_preproc != nullptr) + << "Cannot find " << f_preproc_name << " in the global function"; + f_preproc = *pf_preproc; + } + return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms, f_preproc); } }); +TVM_REGISTER_GLOBAL("cache_flush_cpu_non_first_arg").set_body([](TVMArgs args, TVMRetValue* rv) { + CPUCacheFlush(1, args); +}); + // server function registration. TVM_REGISTER_GLOBAL("tvm.rpc.server.ImportModule").set_body_typed([](Module parent, Module child) { parent->Import(child); diff --git a/src/runtime/rpc/rpc_session.h b/src/runtime/rpc/rpc_session.h index 6a7e6d6e41c1..954c5b4ead22 100644 --- a/src/runtime/rpc/rpc_session.h +++ b/src/runtime/rpc/rpc_session.h @@ -324,10 +324,11 @@ struct RemoteSpace { * minimum duration requirement of one `repeat`. * i.e., When the run time of one `repeat` falls below this time, * the `number` parameter will be automatically increased. + * \param f_preproc The function to be executed before we excetute time evaluator. * \return f_timer A timer function. */ PackedFunc WrapTimeEvaluator(PackedFunc f, TVMContext ctx, int number, int repeat, - int min_repeat_ms); + int min_repeat_ms, PackedFunc f_preproc = nullptr); /*! * \brief Create a Global RPC module that refers to the session. diff --git a/tutorials/autotvm/tune_relay_x86.py b/tutorials/autotvm/tune_relay_x86.py index dcc5b25c8288..92fdafb056d1 100644 --- a/tutorials/autotvm/tune_relay_x86.py +++ b/tutorials/autotvm/tune_relay_x86.py @@ -114,6 +114,11 @@ def get_network(name, batch_size): # We will use local mode for tuning configuration. RPC tracker # mode can be setup similarly to the approach in # :ref:`tune_relay_arm` tutorial. +# +# To perform a precise measurement, we should repeat the measurement several +# times and use the average of results. In addition, we need to flush the cache +# for the weight tensors between repeated measurements. This can make the measured +# latency of one operator closer to its actual latency during end-to-end inference. tuning_option = { 'log_filename': log_file, @@ -122,8 +127,9 @@ def get_network(name, batch_size): 'measure_option': autotvm.measure_option( builder=autotvm.LocalBuilder(), - runner=autotvm.LocalRunner(number=10, repeat=1, - min_repeat_ms=1000), + runner=autotvm.LocalRunner(number=1, repeat=10, + min_repeat_ms=0, + enable_cpu_cache_flush=True), ), }