diff --git a/neural_compressor/common/utils/utility.py b/neural_compressor/common/utils/utility.py index 56d6d22d5c9..b169d418b68 100644 --- a/neural_compressor/common/utils/utility.py +++ b/neural_compressor/common/utils/utility.py @@ -15,8 +15,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import importlib +import subprocess import time +import cpuinfo +import psutil + from neural_compressor.common.utils import TuningLogger, logger __all__ = [ @@ -26,9 +31,127 @@ "set_tensorboard", "dump_elapsed_time", "log_quant_execution", + "singleton", + "LazyImport", + "CpuInfo", ] +def singleton(cls): + """Singleton decorator.""" + + instances = {} + + def _singleton(*args, **kw): + """Create a singleton object.""" + if cls not in instances: + instances[cls] = cls(*args, **kw) + return instances[cls] + + return _singleton + + +class LazyImport(object): + """Lazy import python module till use.""" + + def __init__(self, module_name): + """Init LazyImport object. + + Args: + module_name (string): The name of module imported later + """ + self.module_name = module_name + self.module = None + + def __getattr__(self, name): + """Get the attributes of the module by name.""" + try: + self.module = importlib.import_module(self.module_name) + mod = getattr(self.module, name) + except: + spec = importlib.util.find_spec(str(self.module_name + "." + name)) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + def __call__(self, *args, **kwargs): + """Call the function in that module.""" + function_name = self.module_name.split(".")[-1] + module_name = self.module_name.split(f".{function_name}")[0] + self.module = importlib.import_module(module_name) + function = getattr(self.module, function_name) + return function(*args, **kwargs) + + +@singleton +class CpuInfo(object): + """CPU info collection.""" + + def __init__(self): + """Get whether the cpu numerical format is bf16, the number of sockets, cores and cores per socket.""" + self._bf16 = False + self._vnni = False + info = cpuinfo.get_cpu_info() + if "arch" in info and "X86" in info["arch"]: + cpuid = cpuinfo.CPUID() + max_extension_support = cpuid.get_max_extension_support() + if max_extension_support >= 7: + ecx = cpuid._run_asm( + b"\x31\xC9", # xor ecx, ecx + b"\xB8\x07\x00\x00\x00" b"\x0f\xa2" b"\x89\xC8" b"\xC3", # mov eax, 7 # cpuid # mov ax, cx # ret + ) + self._vnni = bool(ecx & (1 << 11)) + eax = cpuid._run_asm( + b"\xB9\x01\x00\x00\x00", # mov ecx, 1 + b"\xB8\x07\x00\x00\x00" b"\x0f\xa2" b"\xC3", # mov eax, 7 # cpuid # ret + ) + self._bf16 = bool(eax & (1 << 5)) + # TODO: The implementation will be refined in the future. + # https://github.com/intel/neural-compressor/tree/detect_sockets + if "arch" in info and "ARM" in info["arch"]: # pragma: no cover + self._sockets = 1 + else: + self._sockets = self.get_number_of_sockets() + self._cores = psutil.cpu_count(logical=False) + self._cores_per_socket = int(self._cores / self._sockets) + + @property + def bf16(self): + """Get whether it is bf16.""" + return self._bf16 + + @property + def vnni(self): + """Get whether it is vnni.""" + return self._vnni + + @property + def cores_per_socket(self): + """Get the cores per socket.""" + return self._cores_per_socket + + def get_number_of_sockets(self) -> int: + """Get number of sockets in platform.""" + cmd = "cat /proc/cpuinfo | grep 'physical id' | sort -u | wc -l" + if psutil.WINDOWS: + cmd = r'wmic cpu get DeviceID | C:\Windows\System32\find.exe /C "CPU"' + elif psutil.MACOS: # pragma: no cover + cmd = "sysctl -n machdep.cpu.core_count" + + with subprocess.Popen( + args=cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=False, + ) as proc: + proc.wait() + if proc.stdout: + for line in proc.stdout: + return int(line.decode("utf-8", errors="ignore").strip()) + return 0 + + def dump_elapsed_time(customized_msg=""): """Get the elapsed time for decorated functions. diff --git a/requirements_ort.txt b/requirements_ort.txt index cf1352e4395..dc54960543d 100644 --- a/requirements_ort.txt +++ b/requirements_ort.txt @@ -2,4 +2,6 @@ numpy onnx onnxruntime onnxruntime-extensions +psutil +py-cpuinfo pydantic diff --git a/requirements_pt.txt b/requirements_pt.txt index 4365527af7b..311bbcac63d 100644 --- a/requirements_pt.txt +++ b/requirements_pt.txt @@ -1,6 +1,7 @@ auto-round intel_extension_for_pytorch peft +psutil py-cpuinfo pydantic torch diff --git a/test/3x/common/test_utility.py b/test/3x/common/test_utility.py index eb279b6928b..1e7251a6c42 100644 --- a/test/3x/common/test_utility.py +++ b/test/3x/common/test_utility.py @@ -10,7 +10,15 @@ import unittest from neural_compressor.common import options -from neural_compressor.common.utils import set_random_seed, set_resume_from, set_tensorboard, set_workspace +from neural_compressor.common.utils import ( + CpuInfo, + LazyImport, + set_random_seed, + set_resume_from, + set_tensorboard, + set_workspace, + singleton, +) class TestOptions(unittest.TestCase): @@ -55,5 +63,39 @@ def test_set_tensorboard(self): set_tensorboard(tensorboard) +class TestCPUInfo(unittest.TestCase): + def test_cpu_info(self): + cpu_info = CpuInfo() + assert cpu_info.cores_per_socket > 0, "CPU count should be greater than 0" + assert isinstance(cpu_info.bf16, bool), "bf16 should be a boolean" + assert isinstance(cpu_info.vnni, bool), "avx512 should be a boolean" + + +class TestLazyImport(unittest.TestCase): + def test_lazy_import(self): + # Test import + pydantic = LazyImport("pydantic") + assert pydantic.__name__ == "pydantic", "pydantic should be imported" + + def test_lazy_import_error(self): + # Test import error + with self.assertRaises(ImportError): + non_existent_module = LazyImport("non_existent_module") + non_existent_module.non_existent_function() + + +class TestSingletonDecorator: + def test_singleton_decorator(self): + @singleton + class TestSingleton: + def __init__(self): + self.value = 0 + + instance = TestSingleton() + instance.value = 1 + instance2 = TestSingleton() + assert instance2.value == 1, "Singleton should return the same instance" + + if __name__ == "__main__": unittest.main()