forked from facebookresearch/dlrm
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmlperf_logger.py
118 lines (88 loc) · 2.86 KB
/
mlperf_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Utilities for MLPerf logging
"""
import os
import torch
try:
from mlperf_logging import mllog
from mlperf_logging.mllog import constants
_MLLOGGER = mllog.get_mllogger()
except ImportError as error:
print("Unable to import mlperf_logging, ", error)
def log_start(*args, **kwargs):
"log with start tag"
_log_print(_MLLOGGER.start, *args, **kwargs)
def log_end(*args, **kwargs):
"log with end tag"
_log_print(_MLLOGGER.end, *args, **kwargs)
def log_event(*args, **kwargs):
"log with event tag"
_log_print(_MLLOGGER.event, *args, **kwargs)
def _log_print(logger, *args, **kwargs):
"makes mlperf logger aware of distributed execution"
if 'stack_offset' not in kwargs:
kwargs['stack_offset'] = 3
if 'value' not in kwargs:
kwargs['value'] = None
if kwargs.pop('log_all_ranks', False):
log = True
else:
log = (get_rank() == 0)
if log:
logger(*args, **kwargs)
def config_logger(benchmark):
"initiates mlperf logger"
mllog.config(filename=os.path.join(os.path.dirname(os.path.abspath(__file__)), f'{benchmark}.log'))
_MLLOGGER.logger.propagate = False
def barrier():
"""
Works as a temporary distributed barrier, currently pytorch
doesn't implement barrier for NCCL backend.
Calls all_reduce on dummy tensor and synchronizes with GPU.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
torch.distributed.all_reduce(torch.cuda.FloatTensor(1))
torch.cuda.synchronize()
def get_rank():
"""
Gets distributed rank or returns zero if distributed is not initialized.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
else:
rank = 0
return rank
def mlperf_submission_log(benchmark):
"""
Logs information needed for MLPerf submission
"""
config_logger(benchmark)
log_event(
key=constants.SUBMISSION_BENCHMARK,
value=benchmark,
)
log_event(
key=constants.SUBMISSION_ORG,
value='reference_implementation')
log_event(
key=constants.SUBMISSION_DIVISION,
value='closed')
log_event(
key=constants.SUBMISSION_STATUS,
value='onprem')
log_event(
key=constants.SUBMISSION_PLATFORM,
value='reference_implementation')
log_event(
key=constants.SUBMISSION_ENTRY,
value="reference_implementation")
log_event(
key=constants.SUBMISSION_POC_NAME,
value='reference_implementation')
log_event(
key=constants.SUBMISSION_POC_EMAIL,
value='reference_implementation')