Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【Hackathon 6th Fundable Projects 4 No.3】remove ExecutionStrategy in python -part #63132

Merged
merged 9 commits into from
Apr 12, 2024
1 change: 0 additions & 1 deletion python/paddle/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
from .compiler import ( # noqa: F401
BuildStrategy,
CompiledProgram,
ExecutionStrategy,
IpuCompiledProgram,
IpuStrategy,
)
Expand Down
53 changes: 2 additions & 51 deletions python/paddle/distributed/fleet/base/distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,45 +262,6 @@ def load_from_prototxt(self, pb_file):
str(f.read()), self.strategy
)

@property
def execution_strategy(self):
"""
Configure ExecutionStrategy for DistributedStrategy

Examples:
.. code-block:: python

>>> import paddle
>>> exe_strategy = paddle.static.ExecutionStrategy()
>>> exe_strategy.num_threads = 10
>>> exe_strategy.num_iteration_per_drop_scope = 10
>>> exe_strategy.num_iteration_per_run = 10

>>> strategy = paddle.distributed.fleet.DistributedStrategy()
>>> strategy.execution_strategy = exe_strategy

"""
execution_strategy = paddle.static.ExecutionStrategy()
fields = self.strategy.execution_strategy.DESCRIPTOR.fields
for f in fields:
setattr(
execution_strategy,
f.name,
getattr(self.strategy.execution_strategy, f.name),
)
return execution_strategy

@execution_strategy.setter
@is_strict_auto
def execution_strategy(self, strategy):
fields = self.strategy.execution_strategy.DESCRIPTOR.fields
for f in fields:
setattr(
self.strategy.execution_strategy,
f.name,
getattr(strategy, f.name),
)

@property
def build_strategy(self):
"""
Expand Down Expand Up @@ -2660,7 +2621,7 @@ def __repr__(self):

env_draws = line + "\n"
for f in fields:
if "build_strategy" in f.name or "execution_strategy" in f.name:
if "build_strategy" in f.name:
continue
if "_configs" in f.name:
continue
Expand Down Expand Up @@ -2736,15 +2697,5 @@ def __repr__(self):
)
build_strategy_str += border + "\n"

execution_strategy_str = h1_format.format("Execution Strategy")
execution_strategy_str += line + "\n"

fields = self.strategy.execution_strategy.DESCRIPTOR.fields
for f in fields:
execution_strategy_str += h2_format.format(
f.name, str(getattr(self.strategy.execution_strategy, f.name))
)
execution_strategy_str += border + "\n"

result_res += build_strategy_str + execution_strategy_str
result_res += build_strategy_str
return result_res
16 changes: 0 additions & 16 deletions python/paddle/incubate/distributed/fleet/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,6 @@ def __init__(self):
self.use_amp = False # use mixed precision optimizer
self.amp_loss_scaling = 2**15

self.exec_strategy = base.ExecutionStrategy()

# configurations below are used for unit test
self._ut4grad_allreduce = False

Expand Down Expand Up @@ -412,8 +410,6 @@ def _try_to_compile(self, startup_program, main_program):
node_num = self._node_num()
assert node_num >= 1, "nccl2 node_num must >= 1, now:{}" % node_num

exec_strategy = self._strategy.exec_strategy

if node_num <= 1:
if self._strategy.nccl_comm_num > 1:
logging.warn("set nccl_comm_num=1 since you only have 1 node.")
Expand All @@ -426,22 +422,12 @@ def _try_to_compile(self, startup_program, main_program):
self._strategy.use_hierarchical_allreduce = False

sync_allreduce = os.getenv("FLAGS_sync_nccl_allreduce")
if sync_allreduce is None or sync_allreduce == "1":
exec_strategy.num_threads = self._strategy.nccl_comm_num + 1
if self._strategy.use_hierarchical_allreduce:
exec_strategy.num_threads = 2 * self._strategy.nccl_comm_num + 1
if exec_strategy.num_threads > 4:
logging.warn(
"if you use use_hierarchical_allreduce or "
"with multi nccl comm, please export FLAGS_sync_nccl_allreduce = 0"
)

# NOTE. open sync_batch_norm will hang when use multi num_threads
sync_batch_norm = self._strategy.sync_batch_norm
if sync_batch_norm is not None and sync_batch_norm is True:
self._strategy.nccl_comm_num = 1
self._strategy.use_hierarchical_allreduce = False
exec_strategy.num_threads = 1
logging.warn(
"use sync_batch_norm will hang when set num_threads > 1, so "
"set num_threads=1, nccl_comm_num=1, use_hierarchical_allreduce=False."
Expand All @@ -451,8 +437,6 @@ def _try_to_compile(self, startup_program, main_program):
print(
"node_num:",
node_num,
"num_threads:",
exec_strategy.num_threads,
"use_hierarchical_allreduce:",
self._strategy.use_hierarchical_allreduce,
"nccl_comm_num:",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,8 @@ def __init__(self):
self._server_runtime_config = ServerRuntimeConfig()
num_threads = int(os.getenv("CPU_NUM", "1"))

self._execute_strategy = base.ExecutionStrategy()
self._build_strategy = base.BuildStrategy()

self._execute_strategy.num_threads = num_threads
if num_threads > 1:
self._build_strategy.reduce_strategy = (
base.BuildStrategy.ReduceStrategy.Reduce
Expand Down Expand Up @@ -281,31 +279,6 @@ def check_server_runtime_config(self):
"check_server_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)

def get_execute_strategy(self):
return self._execute_strategy

def set_execute_strategy(self, config):
if isinstance(config, base.ExecutionStrategy):
self._execute_strategy = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._execute_strategy, key):
setattr(self._execute_strategy, key, config[key])
else:
raise ValueError(
f"ExecutionStrategy doesn't have key: {key}"
)
else:
raise TypeError(
"execute_strategy only accept input type: dict or ExecutionStrategy"
)
self.check_execute_strategy()

def check_execute_strategy(self):
raise NotImplementedError(
"check_execute_strategy must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)

def get_build_strategy(self):
return self._build_strategy

Expand Down Expand Up @@ -337,7 +310,6 @@ def __init__(self):
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()

def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.SYNC
Expand All @@ -351,9 +323,6 @@ def check_program_config(self):
def check_server_runtime_config(self):
pass

def check_execute_strategy(self):
self._execute_strategy.use_thread_barrier = True

def check_build_strategy(self):
self._build_strategy.async_mode = True

Expand All @@ -365,7 +334,6 @@ def __init__(self):
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()

def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.ASYNC
Expand All @@ -377,9 +345,6 @@ def check_program_config(self):
def check_server_runtime_config(self):
pass

def check_execute_strategy(self):
pass

def check_build_strategy(self):
self._build_strategy.async_mode = True

Expand All @@ -391,7 +356,6 @@ def __init__(self):
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()

def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.HALF_ASYNC
Expand All @@ -404,9 +368,6 @@ def check_program_config(self):
def check_server_runtime_config(self):
pass

def check_execute_strategy(self):
self._execute_strategy.use_thread_barrier = True

def check_build_strategy(self):
self._build_strategy.async_mode = True

Expand All @@ -419,7 +380,6 @@ def __init__(self, update_frequency=100):
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()

def check_program_config(self):
self._program_config.sync_mode = False
Expand All @@ -440,9 +400,6 @@ def check_trainer_runtime_config(self):
def check_server_runtime_config(self):
pass

def check_execute_strategy(self):
pass

def check_build_strategy(self):
self._build_strategy.async_mode = True

Expand Down
2 changes: 0 additions & 2 deletions python/paddle/static/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from ..base.compiler import (
BuildStrategy,
CompiledProgram,
ExecutionStrategy,
IpuCompiledProgram,
IpuStrategy,
)
Expand Down Expand Up @@ -82,7 +81,6 @@
'IpuStrategy',
'Print',
'py_func',
'ExecutionStrategy',
'name_scope',
'program_guard',
'WeightNormParamAttr',
Expand Down
2 changes: 0 additions & 2 deletions python/paddle/static/quantization/adaround.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ def run_adaround(
fetch_op_name = quant_op_out_name

# build adaround program
exec_strategy = static.ExecutionStrategy()
exec_strategy.num_iteration_per_drop_scope = 1
startup_program = static.Program()
train_program = static.Program()
with static.program_guard(train_program, startup_program):
Expand Down
25 changes: 0 additions & 25 deletions test/collective/fleet/test_distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def test_sync_strategy(self):
self.assertEqual(strategy._program_config.sync_mode, False)
self.assertEqual(strategy._program_config.runtime_split_send_recv, True)
self.assertEqual(strategy._build_strategy.async_mode, True)
self.assertEqual(strategy._execute_strategy.num_threads, 2)

# test set_program_config using DistributeTranspilerConfig()
program_config_class = DistributeTranspilerConfig()
Expand Down Expand Up @@ -160,30 +159,6 @@ def test_async_strategy(self):
trainer_runtime_config_illegal,
)

# test set_execute_strategy using base.ExecutionStrategy
exec_strategy_class = base.ExecutionStrategy()
exec_strategy_class.num_threads = 4
strategy.set_execute_strategy(exec_strategy_class)
exec_strategy = strategy.get_execute_strategy()
self.assertEqual(exec_strategy.num_threads, 4)

# test set_execute_strategy using dict
exec_strategy_dict = {}
exec_strategy_dict['num_threads'] = 8
strategy.set_execute_strategy(exec_strategy_dict)
exec_strategy = strategy.get_execute_strategy()
self.assertEqual(exec_strategy.num_threads, 8)

# test set_execute_strategy exception
exec_strategy_dict['unknown'] = None
self.assertRaises(
Exception, strategy.set_execute_strategy, exec_strategy_dict
)
exec_strategy_illegal = None
self.assertRaises(
Exception, strategy.set_execute_strategy, exec_strategy_illegal
)

def test_half_async_strategy(self):
strategy = StrategyFactory.create_half_async_strategy()
self.assertEqual(strategy._program_config.sync_mode, False)
Expand Down
14 changes: 0 additions & 14 deletions test/collective/fleet/test_fleet_distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,6 @@ def test_strategy_prototxt(self):
build_strategy.enable_backward_optimizer_op_deps = True
build_strategy.trainers_endpoints = ["1", "2"]
strategy.build_strategy = build_strategy
exe_strategy = paddle.base.ExecutionStrategy()
exe_strategy.num_threads = 10
exe_strategy.num_iteration_per_drop_scope = 10
exe_strategy.num_iteration_per_run = 10
strategy.execution_strategy = exe_strategy
strategy.save_to_prototxt("dist_strategy.prototxt")
strategy2 = paddle.distributed.fleet.DistributedStrategy()
strategy2.load_from_prototxt("dist_strategy.prototxt")
Expand All @@ -501,15 +496,6 @@ def test_build_strategy(self):
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.build_strategy = build_strategy

def test_execution_strategy(self):
exe_strategy = paddle.base.ExecutionStrategy()
exe_strategy.num_threads = 10
exe_strategy.num_iteration_per_drop_scope = 10
exe_strategy.num_iteration_per_run = 10

strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.execution_strategy = exe_strategy

def test_unknown_strategy(self):
strategy = paddle.distributed.fleet.DistributedStrategy()
with self.assertRaises(TypeError):
Expand Down
9 changes: 3 additions & 6 deletions test/legacy_test/parallel_executor_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def run_executor(exe, binary, feed, fetch_list):
exe = base.Executor(place)
exe.run(startup)

build_strategy, exec_strategy = cls.set_strategy(
build_strategy = cls.set_strategy(
enable_inplace,
enable_sequential_execution,
fuse_all_optimizer_ops,
Expand Down Expand Up @@ -188,7 +188,7 @@ def check_pass_conflict(
exe = base.Executor(place)
exe.run(startup)

build_strategy, exec_strategy = cls.set_strategy(
build_strategy = cls.set_strategy(
enable_inplace,
enable_sequential_execution,
fuse_all_optimizer_ops,
Expand Down Expand Up @@ -222,9 +222,6 @@ def set_strategy(
use_reduce,
use_device,
):
exec_strategy = base.ExecutionStrategy()
if use_fast_executor:
exec_strategy.use_experimental_executor = True
build_strategy = base.BuildStrategy()
build_strategy.reduce_strategy = (
base.BuildStrategy.ReduceStrategy.Reduce
Expand All @@ -249,7 +246,7 @@ def set_strategy(
build_strategy.enable_inplace = False
build_strategy.enable_sequential_execution = False

return build_strategy, exec_strategy
return build_strategy

@classmethod
def build_model(
Expand Down
2 changes: 0 additions & 2 deletions test/legacy_test/test_directory_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def test_new_directory(self):
'paddle.static.gradients',
'paddle.static.BuildStrategy',
'paddle.static.CompiledProgram',
'paddle.static.ExecutionStrategy',
'paddle.static.default_main_program',
'paddle.static.default_startup_program',
'paddle.static.Program',
Expand Down Expand Up @@ -157,7 +156,6 @@ def test_old_directory(self):
'paddle.gradients',
'paddle.BuildStrategy',
'paddle.CompiledProgram',
'paddle.ExecutionStrategy',
'paddle.name_scope',
'paddle.program_guard',
'paddle.Print',
Expand Down
Loading