Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

evaluator should be set in TF_CONFIG when using Estimator distribute strategy #1139

Closed
meibenjin opened this issue Mar 9, 2020 · 22 comments
Closed

Comments

@meibenjin
Copy link

meibenjin commented Mar 9, 2020

Evaluator was excluded when generatingTF_CONFIG environment in tf-operator, see:https://github.com/kubeflow/tf-operator/blob/master/pkg/controller.v1/tensorflow/tensorflow.go#L110

However, when use Estimator with distribute strategy ,TF 1.12 will raise an error:

[2020-03-09 23:29:07,474] [INFO] [113583#MainThread] [tensorflow/python/estimator/training.py:460] Running `train_and_evaluate` with Distribute Coordinator.
[2020-03-09 23:29:07,474] [INFO] [113583#MainThread] [tensorflow/python/distribute/distribute_coordinator.py:720] Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {u'ps': [u'mbj-ps-0.default.svc:20002'], u'chief': [u'mbj-chief-0.default.svc:20001'], u'worker': [u'mbj-worker-0.default.svc:20000']}, task_type = u'evaluator', task_id = 0, environment = u'cloud', rpc_layer = 'grpc'
[2020-03-09 23:29:07,474] [WARNING] [113583#MainThread] [tensorflow/python/distribute/distribute_coordinator.py:772] `eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
Traceback (most recent call last):
  File "train.py", line 136, in <module>
    tf.app.run()
  File "/usr/lib/python2.7/site-packages/tensorflow/python/platform/app.py", line 128, in run
    _sys.exit(main(argv))
  File "train.py", line 130, in main
    eval_spec)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/estimator/training.py", line 462, in train_and_evaluate
    estimator, train_spec, eval_spec, _TrainingExecutor)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/distribute/estimator_training.py", line 279, in train_and_evaluate
    session_config=run_config.session_config)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/distribute/distribute_coordinator.py", line 786, in run_distribute_coordinator
    environment=environment)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/distribute/distribute_coordinator.py", line 374, in _run_std_server
    target = cluster_spec.task_address(task_type, task_id)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 422, in task_address
    raise ValueError("No such job in cluster: %r" % job_name)
ValueError: No such job in cluster: u'evaluator'

similar error occured in TF 1.15:

Traceback (most recent call last):
  File "train.py", line 136, in <module>
    tf.app.run()
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/python/platform/app.py", line 40, in run
    _run(main=main, argv=argv, flags_parser=_parse_flags_tolerate_undef)
  File "/home/pai/envs/compat/lib/python2.7/site-packages/absl/app.py", line 299, in run
    _run_main(main, args)
  File "/home/pai/envs/compat/lib/python2.7/site-packages/absl/app.py", line 250, in _run_main
    sys.exit(main(argv))
  File "train.py", line 105, in main
    train_distribute=tf.contrib.distribute.ParameterServerStrategy(),
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/contrib/distribute/python/parameter_server_strategy.py", line 90, in __init__
    ParameterServerExtended(self, num_gpus_per_worker))
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/contrib/distribute/python/parameter_server_strategy.py", line 132, in __init__
    container_strategy, cluster_resolver=cluster_resolver)
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/python/distribute/parameter_server_strategy.py", line 145, in __init__
    parameter_device=parameter_device)
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/python/distribute/parameter_server_strategy.py", line 156, in _initialize_strategy
    self._initialize_multi_worker(cluster_resolver)
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/python/distribute/parameter_server_strategy.py", line 239, in _initialize_multi_worker
    task_id)
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/python/distribute/multi_worker_util.py", line 120, in is_chief
    _validate_cluster_spec(cluster_spec, task_type, task_id)
  File "/home/pai/envs/compat/lib/python2.7/site-packages/tensorflow_core/python/distribute/multi_worker_util.py", line 80, in _validate_cluster_spec
    raise ValueError("`task_type` %r not found in cluster_spec." % task_type)
ValueError: `task_type` 'evaluator' not found in cluster_spec.

TF code with distribute strategy (1 ps 1 chief 1 worker 1 evaluator):

# -*- coding: utf-8 -*-
"""Example for mnist training using Estimator."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import pai
import os
import json


tf.app.flags.DEFINE_string('works', './train.tfrecords', 'separated by ,')
tf.app.flags.DEFINE_string('model_dir', './', 'model directory')
tf.app.flags.DEFINE_integer('batch_size', 128, 'batch size')
tf.app.flags.DEFINE_integer('max_steps', 50000, 'max training steps')

FLAGS = tf.flags.FLAGS

def model_fn(features, labels, mode):
  """Model function."""
  with tf.variable_scope('lr_softmax'):
    weights = tf.get_variable('weights', initializer=tf.zeros([784, 10]))
    biases = tf.get_variable('biases', initializer=tf.zeros([10]))
    logits = tf.matmul(features, weights) + biases
  loss = tf.reduce_mean(
      tf.nn.softmax_cross_entropy_with_logits_v2(labels=labels, logits=logits),
      name='loss')

  if mode == tf.estimator.ModeKeys.TRAIN:
    global_step = tf.train.get_or_create_global_step()
    opt = tf.train.AdamOptimizer(0.1, name='adam')
    train_op = opt.minimize(loss, global_step=global_step, name='train')
    return tf.estimator.EstimatorSpec(
        mode=mode,
        loss=loss,
        train_op=train_op)
  elif mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(
        mode=mode,
        loss=loss,
        eval_metric_ops={'no_eval': (tf.no_op(), tf.no_op())})
  else:
    raise ValueError(
        "Only TRAIN and EVAL modes are supported: %s" % (mode))

def train_input_fn():
  image = tf.zeros([784], dtype=tf.float32)
  label = tf.zeros([10], dtype=tf.int64)
  d = tf.data.Dataset.from_tensors((image, label)).repeat().batch(FLAGS.batch_size).prefetch(64)
  return d

def eval_input_fn():
  image = tf.zeros([784], dtype=tf.float32)
  label = tf.zeros([10], dtype=tf.int64)
  d = tf.data.Dataset.from_tensors((image, label)).repeat().batch(FLAGS.batch_size).prefetch(64)
  return d

def main(_):
  tf.logging.set_verbosity(tf.logging.INFO)

  estimator = tf.estimator.Estimator(
      model_fn=model_fn,
      model_dir=FLAGS.model_dir,
      config=tf.estimator.RunConfig(
          train_distribute=tf.contrib.distribute.ParameterServerStrategy(),
          session_config=tf.ConfigProto(
              log_device_placement=False,
              allow_soft_placement=True,
              gpu_options=tf.GPUOptions(
                  allow_growth=True,
                  force_gpu_compatible=True))))
  tf.logging.info('Estimator created.')
  
  train_spec = tf.estimator.TrainSpec(
      input_fn=train_input_fn,
      max_steps=50000)
  eval_spec = tf.estimator.EvalSpec(
      input_fn=eval_input_fn)

  tf.logging.info('Tran and Eval specs created.')

  tf.estimator.train_and_evaluate(
      estimator,
      train_spec,
      eval_spec)

if __name__ == '__main__':
  env_dist = os.environ
  print(env_dist.get('TF_CONFIG'))

  tf.app.run()

Note: If train_distribute attribute in tf.estimator.RunConfig is set to None, it works well.

@issue-label-bot
Copy link

Issue-Label Bot is automatically applying the labels:

Label Probability
kind/bug 0.54

Please mark this comment with 👍 or 👎 to give our bot feedback!
Links: app homepage, dashboard and code for this bot.

@meibenjin
Copy link
Author

similar issue in tensorflow repo: tensorflow/tensorflow#30121

@meibenjin meibenjin changed the title evaluator should be set in TF_CONFIG when using Estimator distribute strategy evaluator should be set in TF_CONFIG when using Estimator distribute strategy Mar 9, 2020
@terrytangyuan
Copy link
Member

@johnugeorge @richardsliu @gaocegege Looks like tf-operator needs to support evaluator in addition to ps and worker. An example cluster spec that has all these roles:

cluster = {'chief': ['host0:2222'],
                 'evaluator': ['host6:2222'],
                 'ps': ['host1:2222', 'host2:2222'],
                 'worker': ['host3:2222', 'host4:2222', 'host5:2222']}

On evaluator node, TF_CONFIG should be similar to the following in order to tell TensorFlow to use this node for performing model evaluation:

os.environ['TF_CONFIG'] = json.dumps(
      {'cluster': cluster,
       'task': {'type': 'evaluator', 'index': 0}})

@richardsliu richardsliu self-assigned this Mar 12, 2020
@meibenjin
Copy link
Author

meibenjin commented Mar 13, 2020

@johnugeorge @richardsliu @gaocegege Looks like tf-operator needs to support evaluator in addition to ps and worker. An example cluster spec that has all these roles:

cluster = {'chief': ['host0:2222'],
                 'evaluator': ['host6:2222'],
                 'ps': ['host1:2222', 'host2:2222'],
                 'worker': ['host3:2222', 'host4:2222', 'host5:2222']}

On evaluator node, TF_CONFIG should be similar to the following in order to tell TensorFlow to use this node for performing model evaluation:

os.environ['TF_CONFIG'] = json.dumps(
      {'cluster': cluster,
       'task': {'type': 'evaluator', 'index': 0}})

@terrytangyuan If evaluator contains TF_CONFIG cluster spec, some changes should be made in tensorflow 1.12 ( or above ) when train_distribute attribute in tf.estimator.RunConfig is set to None. Otherwise other node will wait for evaluator forever:
image

@meibenjin
Copy link
Author

@richardsliu @gaocegege I'm glad to take time to fix this problem if possible.

@johnugeorge
Copy link
Member

LGTM

@meibenjin
Copy link
Author

LGTM

�Hi,you mean the way TF_CONFIG environment set by tf-operator currently is correct?

@ashahab
Copy link

ashahab commented Mar 16, 2020

@meibenjin are you working on this? We are blocked on this too and we'd like to create a patch if you haven't already(would be glad to test your patch if there's an image).

@gaocegege
Copy link
Member

@meibenjin Welcome the PR. I think it is a problem, but one question: How about the old version TF?

@meibenjin
Copy link
Author

@meibenjin Welcome the PR. I think it is a problem, but one question: How about the old version TF?

I think some changes would be made in old version TF if we add evaluator in TF_CONFIG cluster (remove evaluator from TF_CONFIG cluster_spec),In our test when train_distribute attribute in tf.estimator.RunConfig is set to None. Other nodes will wait for evaluator forever:
image

@meibenjin
Copy link
Author

meibenjin commented Mar 17, 2020

@meibenjin are you working on this? We are blocked on this too and we'd like to create a patch if you haven't already(would be glad to test your patch if there's an image).

@ashahab please see my reply to gaocegege ,We should think about the compatibility in old version TF like Tensorflow1.12。

@gaocegege
Copy link
Member

@meibenjin Thanks for the reply. I will comment soon after a deep dive into the TF code.

@gaocegege
Copy link
Member

@johnugeorge @richardsliu @gaocegege Looks like tf-operator needs to support evaluator in addition to ps and worker. An example cluster spec that has all these roles:

cluster = {'chief': ['host0:2222'],
                 'evaluator': ['host6:2222'],
                 'ps': ['host1:2222', 'host2:2222'],
                 'worker': ['host3:2222', 'host4:2222', 'host5:2222']}

On evaluator node, TF_CONFIG should be similar to the following in order to tell TensorFlow to use this node for performing model evaluation:

os.environ['TF_CONFIG'] = json.dumps(
      {'cluster': cluster,
       'task': {'type': 'evaluator', 'index': 0}})

@terrytangyuan We already support it if there is no DistributionStrategy. DistributionStrategy requires a validate function call to validate that the evaluator should be in cluster spec.

tensorflow/tensorflow#30121

@gaocegege
Copy link
Member

I think some changes would be made in old version TF if we add evaluator in TF_CONFIG cluster (remove evaluator from TF_CONFIG cluster_spec),In our test when train_distribute attribute in tf.estimator.RunConfig is set to None. Other nodes will wait for evaluator forever:

Yeah. It is what I worry about. All the replicas will wait for the evaluator session.

I am not sure why distribute strategy needs such a validate function to keep evaluators in cluster_spec. Not sure if it is a bug or feature.

/cc @terrytangyuan Do you have any idea about it?

@terrytangyuan
Copy link
Member

@gaocegege Not sure. It's probably due to some requirements in higher level APIs. We can bring this up in tensorflow/tensorflow#30121.

@richardsliu
Copy link
Contributor

I have a PR fix here: #1146

But this will run into the issue mentioned by @meibenjin.

@chunyang-wen
Copy link

Please have a look at this tensorflow/tensorflow#27857 (comment). Master node is not officially supported. Because master will start an evaluator itself.

@stale
Copy link

stale bot commented Jun 30, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot closed this as completed Jul 8, 2020
@Mesilenceki
Copy link

hey, do you guys have a clear solution? I still came out with this problem.

@issue-label-bot
Copy link

Issue-Label Bot is automatically applying the labels:

Label Probability
area/operator 0.51

Please mark this comment with 👍 or 👎 to give our bot feedback!
Links: app homepage, dashboard and code for this bot.

@pengyuan
Copy link

i have the same problem, is there one solution or workaround please? @terrytangyuan

from estimator.run_config.py:

Example of evaluator node (evaluator is not part of training cluster):
    ```
      cluster = {'chief': ['host0:2222'],
                 'ps': ['host1:2222', 'host2:2222'],
                 'worker': ['host3:2222', 'host4:2222', 'host5:2222']}
      os.environ['TF_CONFIG'] = json.dumps(
          {'cluster': cluster,
           'task': {'type': 'evaluator', 'index': 0}})
      config = RunConfig()
      assert config.master == ''
      assert config.evaluator_master == ''
      assert config.task_id == 0
      assert config.num_ps_replicas == 0
      assert config.num_worker_replicas == 0
      assert config.cluster_spec == {}
      assert config.task_type == 'evaluator'
      assert not config.is_chief
    ```

@chunyang-wen
Copy link

@pengyuan There is a workaround here. But it seems a little hacky. An estimator parses all the cluster information from the environment variable TF_CONFIG. You can try to override the TF_CONFIG to a reasonable string before starting your own program. But be careful about the exiting logic of the whole tensorflow program. For example, you have 1 ps, 1 master, 2 workers. You can create a valid TF_CONFIG with 1 ps, 1 chief, 1 evaluator, 1 worker. The evaluator should not appear in the cluster section of TF_CONFIG. You can choose any machine from the worker list as an evaluator.

original TF_CONFIG may be

TF_CONFIG = {
    "cluster": {
      "ps": ["localhost:port1"],
      "worker":["localhost:port4", "localhost:port2"],
      "master": ["localhost:port3"]
    },
    "task": {
      "type": "master",
      "index": 0
    }
},

We choose the first worker from the worker list as the evaluator.

The TF_CONFIG for each role is:

for chief

TF_CONFIG = {
  "cluster": {
  "ps": ["localhost:port1"],
  "worker": ["localhost:port2"],
  "chief": ["localhost:port3"]
  },
  "task": {
  "type": "chief",
  "index": 0
  }
}

for ps

TF_CONFIG = {
  "cluster": {
    "ps": ["localhost:port1"],
    "worker": ["localhost:port2"],
    "chief": ["localhost:port3"]
  },
  "task": {
    "type": "ps",
    "index": 0
  }
}

for worker

TF_CONFIG = {
  "cluster": {
    "ps": ["localhost:port1"],
    "worker": ["localhost:port2"],
    "chief": ["localhost:port3"]
  },
  "task": {
    "type": "worker",
    "index": 0
  }
}

for evaluator

TF_CONFIG = {
  "cluster": {
    "ps": ["localhost:port1"],
    "worker": ["localhost:port2"],
    "chief": ["localhost:port3"]
  },
  "task": {
    "type": "evaluator",
    "index": 0
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants