diff --git a/npu/build/__init__.py b/npu/build/__init__.py index 0ebf5af..16bea26 100644 --- a/npu/build/__init__.py +++ b/npu/build/__init__.py @@ -2,12 +2,13 @@ # SPDX-License-Identifier: MIT import os -from .utils import is_win, is_win_path, is_wsl_win_path +from .utils import is_win_path, is_wsl_win_path MODULE_PATH = os.path.dirname(os.path.realpath(__file__)) -BUILD_TEMPLATE_PATH = os.path.join(MODULE_PATH,"build_template") +BUILD_TEMPLATE_PATH = os.path.join(MODULE_PATH, "build_template") -def wslpath(winpath:str)->str: + +def wslpath(winpath: str) -> str: """ From the windows path create the equivalent WSL path """ if is_win_path(winpath): drive = winpath[0].lower() @@ -19,4 +20,3 @@ def wslpath(winpath:str)->str: return newpath else: return winpath - diff --git a/npu/build/build_template/kernel_build.sh b/npu/build/build_template/kernel_build.sh index d271595..556f1a6 100644 --- a/npu/build/build_template/kernel_build.sh +++ b/npu/build/build_template/kernel_build.sh @@ -8,6 +8,6 @@ cd $SCRIPT_DIR source /opt/mlir_settings.sh -xchesscc $CHESSCC2_FLAGS -I kernels -c $1.cc -o $1.o #2>&1 | tee $1.log +xchesscc $CHESSCC2_FLAGS -I kernels -c $1.cc -o $1.o -d #2>&1 | tee $1.log echo "Successfully built $1.o" \ No newline at end of file diff --git a/npu/build/kernel.py b/npu/build/kernel.py index 066aa24..dc82f10 100644 --- a/npu/build/kernel.py +++ b/npu/build/kernel.py @@ -12,14 +12,13 @@ from .port import BufferPort, RTPPort from typing import Optional, Callable, List, Dict import re -import warnings class Kernel(KernelMeta): - """This class encapsulates a ComputeTile kernel C/C++ src code and methods to generate a compiled object - that compiled object - is used within MLIR to build the final xclbin application. Additionally, the kernel is parsed for input and output ports and can - encapsulates a behavioral model to capture functional behavior and data shaping from input to output ports. This metadata - for the kernel enables behavioral execution to verify correctness in Python and also tracing to build the final AppBuilder xclbin. + """This class encapsulates a ComputeTile kernel C/C++ src code and methods to generate a compiled object - that compiled object + is used within MLIR to build the final xclbin application. Additionally, the kernel is parsed for input and output ports and can + encapsulates a behavioral model to capture functional behavior and data shaping from input to output ports. This metadata + for the kernel enables behavioral execution to verify correctness in Python and also tracing to build the final AppBuilder xclbin. Attributes ---------- @@ -32,15 +31,17 @@ class Kernel(KernelMeta): behavioralfx : function The behavioral function that emulates the C/C++ kernel's behavior. """ - def __init__(self, srccode : str, behavioralfx:Optional[Callable]=None, top_function:Optional[str]=None, requires_boilerplate:bool=False) -> None: - """Return a new Kernel object.""" + def __init__(self, srccode: str, behavioralfx: Optional[Callable] = None, + top_function: Optional[str] = None, + requires_boilerplate: bool = False) -> None: + """Return a new Kernel object.""" if srccode.endswith('.cpp') or srccode.endswith('.cc'): with open(srccode, 'r') as file: self._srccode = file.read() self.srcfile = srccode else: self._srccode = srccode - self.srcfile = None + self.srcfile = None self._requires_boilerplate = requires_boilerplate self._top_function = top_function @@ -51,16 +52,17 @@ def __init__(self, srccode : str, behavioralfx:Optional[Callable]=None, top_func super().__init__(kname, kname, kname, "CT", ports=_parsed_ports) if behavioralfx is None: - self.behavioralfx = _default_behavioral_validate_bufferports + self.behavioralfx = _default_behavioral_validate_bufferports else: self.behavioralfx = behavioralfx self.kb = KernelObjectBuilder(self.ktype, self.srccode, self.srcfile) + self._asmlst = None self._main_function_sanity_check() self._extern_c_check() self._expose_ports() - def _expose_ports(self)->None: + def _expose_ports(self) -> None: for p in self.ports: setattr(self, p.name, p) @@ -75,7 +77,7 @@ def _parse_code(self): parsedname = self._top_function return parsedname, allports, functions[parsedname] - def _parse_functions(self, functions_l:List)->Dict: + def _parse_functions(self, functions_l: List) -> Dict: """Parse the functions list into a dict.""" f = {} for funcname in functions_l: @@ -94,7 +96,7 @@ def _parse_functions(self, functions_l:List)->Dict: f[funcname['name']] = e return f - def _main_function_sanity_check(self)->None: + def _main_function_sanity_check(self) -> None: if not self._main_function['rtnType'] == "void": raise RuntimeError(f"The return type of the top_level function should be void not {self._main_function['rtnType']}") @@ -105,16 +107,16 @@ def _extern_c_check(self): raise SyntaxError('extern "C" not found. Top level function ' 'should be wrapped by extern "C"') - def display(self)->None: + def display(self) -> None: """Render the kernel code in a jupyter notebook.""" from IPython.display import display, Code - _code = Code(self._srccode, language="cpp") + _code = Code(self.srccode, language="cpp") display(_code) - def completed_srccode(self)->str: + def completed_srccode(self) -> str: """From the parsed information generate the source.""" if self._requires_boilerplate: - preamble= f""" + preamble = """ #define NOCPP #include #include @@ -138,23 +140,23 @@ def completed_srccode(self)->str: # nested { } match = re.search(pattern, s) end_index = self._find_matching_brackets(s, match.end()-1) - s = s[:end_index] + '\n} // extern end\n' + s[end_index:] + s = s[:end_index] + '\n} // extern end\n' + s[end_index:] else: s = f"{self._srccode}" return s - def _get_ptr_type_depth(self, arg)->int: + def _get_ptr_type_depth(self, arg) -> int: arg = arg.rstrip() count = 0 for i in reversed(arg): - if i=="*": + if i == "*": count += 1 else: break return count - def _find_matching_brackets(self, s, start_index:int): + def _find_matching_brackets(self, s, start_index: int): stack = [] for i in range(start_index, len(s)): if s[i] == '{': @@ -162,12 +164,13 @@ def _find_matching_brackets(self, s, start_index:int): elif s[i] == '}': stack.pop() - if len(stack)==0: - return i+1 + if len(stack) == 0: + return i+1 - raise RuntimeError(f"Unable to find closing brace for {self._main_function['fullsig']}") + raise RuntimeError("Unable to find closing brace for " + f"{self._main_function['fullsig']}") - def to_cpp(self)->None: + def to_cpp(self) -> None: """ output source code to a .cpp file""" with open(f'{self.name}.cpp', 'w') as file: file.write(self.completed_srccode()) @@ -181,7 +184,7 @@ def __call__(self, *args, behavioral_n_tracing=True): ''' if self.behavioralfx is None: - raise ValueError(f'Unable to trace Kernel I/O with no behavioral model for kernel {self.name}') + raise ValueError(f'Unable to trace Kernel I/O with no behavioral model for kernel {self.name}') bufferargs = [a for a in args if isinstance(a, BufferPort) or isinstance(a, Buffer) or isinstance(a, np.ndarray)] rtpargs = [a for a in args if isinstance(a, int)] @@ -215,24 +218,23 @@ def _validate_args(self, args, bufferargs, rtpargs, behavioral_n_tracing): def _set_arg_io_values(self, bufferargs, rtpargs): '''Map C/C++ arguments to BufferPorts and RTP ports.''' - mapped_buffers = itertools.zip_longest(self.bufferports,bufferargs) + mapped_buffers = itertools.zip_longest(self.bufferports, bufferargs) for bufferport, input_arg in mapped_buffers: if input_arg is not None: - bufferport.io = 'in' - bufferport.array = Buffer.to_ndarray(input_arg) + bufferport.io = 'in' + bufferport.array = Buffer.to_ndarray(input_arg) else: bufferport.io = 'out' bufferport.slices = list() - for rtpport, call_value in zip(self.rtpports,rtpargs): + for rtpport, call_value in zip(self.rtpports, rtpargs): rtpport.value = call_value - def create_outputs(self, behavioral_n_tracing): """From kernel call, produce the output value or tuple.""" if behavioral_n_tracing is True: - outputs = [Buffer.to_ndarray(op) for op in self.outputbufferports] + outputs = [Buffer.to_ndarray(op) for op in self.outputbufferports] else: outputs = self.outputbufferports @@ -245,24 +247,39 @@ def build(self, debug=False): """Build the kernel object file for linking into the complete application.""" if not os.path.exists(self.kb.buildobjpath): self.kb.build(debug) + with open(self.kb.buildobjpath + '.lst', 'r', encoding='utf-8') as file: + vliwasm = file.read() + self._asmlst = '\n'.join(vliwasm.split('\n')[6:]) @property def objfile(self): self.build() return self.kb.buildobjpath - + + @property + def asm(self): + """Returns string of VLIW Assembly instructions""" + if self._asmlst is None: + raise RuntimeError(f'Kernel: {self.name} is not built (compiled). ' + 'Build kernel to check assembly') + return self._asmlst + + def asmdisplay(self) -> None: + """Render the VLIW Assembly instructions in a jupyter notebook""" + from IPython.display import display, Code + display(Code(self.asm, language="c-objdump")) + def _parsecpp_to_ports(self, parsedcpp): - bufferports = [BufferPort(param['name'], param['type']) - for param in parsedcpp.functions[-1]["parameters"] - if '*' in param['type'] ] - - rtpports = [RTPPort(param['name'], param['type'], c_dtype=param['type']) - for param in parsedcpp.functions[-1]["parameters"] - if '*' not in param['type'] ] - - + bufferports = [BufferPort(param['name'], param['type']) + for param in parsedcpp.functions[-1]["parameters"] + if '*' in param['type']] + + rtpports = [RTPPort(param['name'], param['type'], c_dtype=param['type']) + for param in parsedcpp.functions[-1]["parameters"] + if '*' not in param['type']] + return bufferports + rtpports - + def _default_behavioral_validate_bufferports(invobj): """ A behavioural model that gives users guidance to build a behavorial model or @@ -273,4 +290,3 @@ def _default_behavioral_validate_bufferports(invobj): raise RuntimeError(f"Default behavioral model is being used but cannot determine shape for port {p.name} - \ please specify a behavioral function for this kernel or set the array sizes before \ using the kernel. E.g. {p.name}.array = np.ndarray(...)") - diff --git a/npu/build/kernelbuilder.py b/npu/build/kernelbuilder.py index a66deed..81f53d1 100644 --- a/npu/build/kernelbuilder.py +++ b/npu/build/kernelbuilder.py @@ -54,6 +54,7 @@ def __init__(self, name, srccode, srcfile) -> None: self.buildlog = os.path.join(self.build_path, f'{self.name}.log') self.buildobjpath = os.path.join(self.build_path, f'{self.name}.o') + self.buildasmpath = os.path.join(self.build_path, f'{self.name}.o.lst') def build(self, debug=False): """Build the kernel object file and copy it to self.prebuilt_objpath.""" @@ -62,6 +63,7 @@ def build(self, debug=False): print(f"Using cached {self.name} kernel object file...") self._wslcall(f"{wsl_prefix()}cp", [f"{wslpath(self.prebuilt_objpath)}", f"{wslpath(self.buildobjpath)}"], debug) + self._wslcall(f"{wsl_prefix()}cp", [f"{wslpath(self.prebuilt_objpath + '.lst')}", f"{wslpath(self.buildobjpath) + '.lst'}"], debug) else: print(f"Building the {self.name} kernel...") @@ -78,6 +80,7 @@ def build(self, debug=False): self._wslcall(f"{wsl_prefix()}bash", [f"{wslpath(self.build_path)}/kernel_build.sh", f"{self.name}"], debug) self._wslcall(f"{wsl_prefix()}cp", [f"{wslpath(self.buildobjpath)}", f"{wslpath(self.prebuilt_objpath)}"], debug) + self._wslcall(f"{wsl_prefix()}cp", [f"{wslpath(self.buildasmpath)}", f"{wslpath(self.prebuilt_objpath + '.lst')}"], debug) self.update_cache_md5() diff --git a/tests/test_asm.py b/tests/test_asm.py new file mode 100644 index 0000000..aafed0f --- /dev/null +++ b/tests/test_asm.py @@ -0,0 +1,38 @@ +# Copyright 2024 Advanced Micro Devices, Inc. +# SPDX-License-Identifier: MIT + +import pytest +from npu.lib import Plus1, PlusN, RgbaRtpThres +from npu.build.kernelbuilder import KernelObjectBuilder + + +KernelObjectBuilder.clear_cache() + + +def test_asm_kernel_built(): + """Test if a built kernel returns a string of text""" + + kernelobj = Plus1() + kernelobj.build() + kernelobj.asmdisplay() + assert kernelobj.asm + + +def test_asm_kernel_notbuilt_asm(): + """Test if a non built kernel returns RuntimeError when calling asm""" + + kernelobj = PlusN() + with pytest.raises(RuntimeError) as excinfo: + _ = kernelobj.asm + + assert 'is not built (compiled)' in str(excinfo.value) + + +def test_asm_kernel_notbuilt_asmdisplay(): + """Test if a non built kernel returns RuntimeError when calling asmdisplay""" + + kernelobj = RgbaRtpThres() + with pytest.raises(RuntimeError) as excinfo: + _ = kernelobj.asmdisplay() + + assert 'is not built (compiled)' in str(excinfo.value)