Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ROOFLINE] Roofline analysis over RPC #11252

Merged
merged 5 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 91 additions & 16 deletions python/tvm/utils/roofline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,32 @@
from typing import Dict, Union, Optional
import numpy as np

from .. import auto_scheduler, relay, tir, nd, IRModule, build, topi, transform
from .. import auto_scheduler, relay, tir, nd, IRModule, build, topi, transform, get_global_func
from ..target import Target
from ..runtime import profiler_vm, profiling, Device, num_threads
from ..script import tir as T
from ..ir.instrument import pass_instrument
from ..ir.expr import GlobalVar
from ..rpc.base import RPC_SESS_MASK
from ..rpc.client import RPCSession
from ..contrib import utils


def _create_args(mod: IRModule, dev: Device, func_name: str = "main"):
def _create_args(mod: IRModule, dev: Device, func_name: str = "main", remote=None):
if dev.device_type >= RPC_SESS_MASK:
random_fill = remote.get_function("tvm.contrib.random.random_fill")
else:
random_fill = get_global_func("tvm.contrib.random.random_fill")
assert random_fill, "Please make sure USE_RANDOM is ON in config.cmake"
args = []
for arg in mod[func_name].params:
args.append(
nd.array(
np.zeros([x.value for x in arg.type_annotation.shape], arg.type_annotation.dtype),
device=dev,
)
ary = nd.empty(
[x.value for x in arg.type_annotation.shape],
arg.type_annotation.dtype,
device=dev,
)
random_fill(ary)
args.append(ary)
return args


Expand Down Expand Up @@ -103,6 +112,7 @@ def estimate_peak_fma_flops(
dev: Device,
vec_width: Optional[int] = None,
num_vector_registers: Optional[int] = None,
remote: Optional[RPCSession] = None,
) -> float:
"""
Estimate the maximum number of FLOP/s this target/device combo is capable
Expand All @@ -123,6 +133,9 @@ def estimate_peak_fma_flops(
num_vector_registers : Optional[int]
Number of vector registers on the underlying hardware. Will try to
infer if no value is provided.
remote : Optional[RPCSession]
Remote session used to upload artifacts for runtime evaluation. Must be
the same session used to create `dev`.

Returns
-------
Expand All @@ -146,7 +159,23 @@ def estimate_peak_fma_flops(
)
with transform.PassContext(opt_level=3):
f = build(specialized, target=target)
a = nd.array(np.ones((nthreads, num_vector_registers, vec_width), dtype="float32"), device=dev)

# upload to remote if running over rpc
if dev.device_type >= RPC_SESS_MASK:
if remote is None:
raise RuntimeError("A RPCSession must be provided when using a remote device.")
temp = utils.tempdir()
path = temp.relpath("peak_fma_flops.tar")
f.export_library(path)
remote.upload(path)
f = remote.load_module("peak_fma_flops.tar")
random_fill = remote.get_function("tvm.contrib.random.random_fill")
else:
random_fill = get_global_func("tvm.contrib.random.random_fill")
assert random_fill, "Please make sure USE_RANDOM is ON in config.cmake"

a = nd.empty((nthreads, num_vector_registers, vec_width), dtype="float32", device=dev)
random_fill(a)
times = f.time_evaluator(f.entry_name, dev, repeat=100, number=1)(a)
flops = 2 * vec_width * num_vector_registers * nthreads * iters # fma is two flops
flop_s = flops / times.min
Expand All @@ -171,7 +200,12 @@ def peak_bandwidth_tir(a: T.handle, b: T.handle, threads: T.int32, vec_width: T.
B[i, l, j] += A[i, k, l, j]


def estimate_peak_bandwidth(target: Target, dev: Device, vec_width: Optional[int] = None) -> float:
def estimate_peak_bandwidth(
target: Target,
dev: Device,
vec_width: Optional[int] = None,
remote: Optional[RPCSession] = None,
) -> float:
"""Estimate peak memory bandwidth of a target/device combo.

Peak bandwidth is estimated by running a small experiment on the underlying
Expand All @@ -187,6 +221,9 @@ def estimate_peak_bandwidth(target: Target, dev: Device, vec_width: Optional[int
Device to measure peak bandwidth on.
vec_width : Optional[int]
Vector unit width, determined from target if not supplied.
remote : Optional[RPCSession]
Remote session used to upload artifacts for runtime evaluation. Must be
the same session used to create `dev`.

Returns
-------
Expand All @@ -207,13 +244,30 @@ def estimate_peak_bandwidth(target: Target, dev: Device, vec_width: Optional[int
)
with transform.PassContext(opt_level=3):
f = build(specialized, target=target)

# upload to remote if running over rpc
if dev.device_type >= RPC_SESS_MASK:
if remote is None:
raise RuntimeError("A RPCSession must be provided when using a remote device.")
temp = utils.tempdir()
path = temp.relpath("peak_bandwidth.tar")
f.export_library(path)
remote.upload(path)
f = remote.load_module("peak_bandwidth.tar")
random_fill = remote.get_function("tvm.contrib.random.random_fill")
else:
random_fill = get_global_func("tvm.contrib.random.random_fill")
assert random_fill, "Please make sure USE_RANDOM is ON in config.cmake"

threads = num_threads()
# Data size needs to be larger than last level of cache. We don't have a
# way of getting cache sizes, so this number should give us a large enough
# size.
size = 10**8 // (4 * threads * vec_width)
a = nd.array(np.ones((threads, size, 4, vec_width), dtype="float32"), device=dev)
b = nd.array(np.ones((threads, vec_width, 4), dtype="float32"), device=dev)
a = nd.empty((threads, size, 4, vec_width), dtype="float32", device=dev)
random_fill(a)
b = nd.empty((threads, vec_width, 4), dtype="float32", device=dev)
random_fill(b)
times = f.time_evaluator(f.entry_name, dev, repeat=10, number=1)(a, b, threads)
return a.numpy().size * 4 / times.min # 4 bytes per float32

Expand Down Expand Up @@ -241,6 +295,7 @@ def roofline_from_existing(
tir_functions: Dict[GlobalVar, tir.PrimFunc],
target: Target,
dev: Device,
remote: Optional[RPCSession] = None,
) -> profiling.Report:
"""Add roofline and other estimated statistics to an existing profiling report.

Expand Down Expand Up @@ -290,6 +345,9 @@ def roofline_from_existing(
TVM target that `report` was generated with.
dev : Device
Device that `report` was generated with.
remote : Optional[RPCSession]
Remote session used to upload artifacts for runtime evaluation. Must be
the same session used to create `dev`.

Returns
-------
Expand All @@ -299,8 +357,8 @@ def roofline_from_existing(
:py:func:`roofline_analysis` for more information on which metrics
are included.
"""
peak_bandwidth = estimate_peak_bandwidth(target, dev)
peak_flops = estimate_peak_fma_flops(target, dev)
peak_bandwidth = estimate_peak_bandwidth(target, dev, remote=remote)
peak_flops = estimate_peak_fma_flops(target, dev, remote=remote)

ridge_point = peak_flops / peak_bandwidth

Expand Down Expand Up @@ -346,7 +404,11 @@ def roofline_from_existing(


def roofline_analysis(
mod: IRModule, params: Dict[str, nd.NDArray], target: Union[str, Target], dev: Device
mod: IRModule,
params: Dict[str, nd.NDArray],
target: Union[str, Target],
dev: Device,
remote: Optional[RPCSession] = None,
) -> profiling.Report:
"""
Create a profiling report that contains roofline and other estimated
Expand Down Expand Up @@ -385,6 +447,10 @@ def roofline_analysis(
dev : Device
Device to run on.

remote : Optional[RPCSession]
Remote session used to upload artifacts for runtime evaluation. Must be
the same session used to create `dev`.

Returns
-------

Expand All @@ -405,9 +471,18 @@ def roofline_analysis(
config=pass_ctx.config,
):
lib = relay.vm.compile(mod, params=params, target=target)
# upload to remote if running over rpc
if dev.device_type >= RPC_SESS_MASK:
if remote is None:
raise RuntimeError("A RPCSession must be provided when using a remote device.")
temp = utils.tempdir()
path = temp.relpath("roofline_lib.tar")
lib.mod.export_library(path)
remote.upload(path)
lib = remote.load_module("roofline_lib.tar")
vmexec = profiler_vm.VirtualMachineProfiler(lib, dev)

args = _create_args(mod, dev)
args = _create_args(mod, dev, remote=remote)
report = vmexec.profile(*args)

return roofline_from_existing(report, save_tir.functions, target, dev)
return roofline_from_existing(report, save_tir.functions, target, dev, remote=remote)
57 changes: 55 additions & 2 deletions tests/python/unittest/test_runtime_profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,23 @@ def test_estimate_peak_fma_flops(target, dev):
flops = tvm.utils.estimate_peak_fma_flops(tvm.target.Target(target), dev)
# Assume we can achieve 1 GFLOP/s per thread, which is 1 FLOP per cycle on a 1GHz cpu.
assert (
flops > 10**9 * tvm.runtime.num_threads() and flops < 10**14
), f"FLOP/s should be between 10^9 * num_threads and 10^14, but it is {flops}"
flops > 10**9 and flops < 10**14
), f"FLOP/s should be between 10^9 and 10^14, but it is {flops}"


def test_estimate_peak_fma_flops_rpc():
target = "llvm -mattr=+fma,+avx2"
server = rpc.Server(key="profiling")
remote = rpc.connect("127.0.0.1", server.port, key="profiling")
dev = remote.device(target)
flops = tvm.utils.estimate_peak_fma_flops(tvm.target.Target(target), dev, remote=remote)
# Assume we can achieve 1 GFLOP/s per thread, which is 1 FLOP per cycle on a 1GHz cpu.
assert (
flops > 10**9 and flops < 10**14
), f"FLOP/s should be between 10^9 and 10^14, but it is {flops}"


@tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386")
@tvm.testing.parametrize_targets("llvm")
def test_estimate_peak_bandwidth(target, dev):
# This test uses vectorized instructions so we need a target that supports them
Expand All @@ -284,6 +297,20 @@ def test_estimate_peak_bandwidth(target, dev):
), f"Bandwidth should be between 10^9 and 10^12, but it is {bandwidth}"


@tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386")
def test_estimate_peak_bandwidth_rpc():
target = "llvm -mattr=+fma,+avx2"
server = rpc.Server(key="profiling")
remote = rpc.connect("127.0.0.1", server.port, key="profiling")
dev = remote.device(target)
bandwidth = tvm.utils.estimate_peak_bandwidth(tvm.target.Target(target), dev, remote=remote)
# Assume we can achieve 1 GB/s. DDR2 should transfer somewhere around 6
# GB/s, so this should leave enough wiggle room.
assert (
bandwidth > 10**9 and bandwidth < 10**12
), f"Bandwidth should be between 10^9 and 10^12, but it is {bandwidth}"


@tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386")
@tvm.testing.parametrize_targets("llvm")
def test_roofline_analysis(target, dev):
Expand All @@ -304,6 +331,32 @@ def test_roofline_analysis(target, dev):
assert call["Percent of Theoretical Optimal"].ratio >= 0


@tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386")
def test_roofline_analysis_rpc():
target = "llvm"

a = relay.var("a", relay.TensorType((512, 512), "float32"))
b = relay.var("b", relay.TensorType((512, 512), "float32"))
c = relay.nn.dense(a, b)
mod = tvm.IRModule.from_expr(relay.Function([a, b], c))
params = {}

server = rpc.Server(key="profiling")
remote = rpc.connect("127.0.0.1", server.port, key="profiling")
dev = remote.device(target)

report = tvm.utils.roofline_analysis(mod, params, target, dev, remote=remote)

assert "Bound" in report.table()
assert "Percent of Theoretical Optimal" in report.table()
for call in report.calls:
if "Percent of Theoretical Optimal" in call:
# Ideally we'd like a little tighter bound here, but it is hard to
# know how well this dense will perform without tuning. And we
# don't have an operator that uses a specific number of flops.
assert call["Percent of Theoretical Optimal"].ratio >= 0


if __name__ == "__main__":
import sys
import pytest
Expand Down