Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using keras for Distributed training raise RuntimeError("Graph is finalized and cannot be modified.") #3997

Closed
allenwoods opened this issue Oct 8, 2016 · 8 comments

Comments

@allenwoods
Copy link

I'm using keras for distributed training with following code:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Created by Enigma on 2016/9/26

import numpy as np
import tensorflow as tf

# Define Hyperparameters
FLAGS = tf.app.flags.FLAGS

# For missions
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

# Hyperparameters

from keras import backend as K
from keras.layers import Input, Dense
from keras.models import Model


def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

    server_config = tf.ConfigProto(
        gpu_options=tf.GPUOptions(allow_growth=True),
        log_device_placement=True)
    server = tf.train.Server(cluster, config=server_config,
                             job_name=FLAGS.job_name, task_index=FLAGS.task_index)

    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":
        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d/cpu:0" % FLAGS.task_index,
                cluster=cluster)):
            global_step = tf.Variable(0, name='global_step', trainable=False)
            inputs = Input(shape=[1, ])
            hidden = Dense(10, activation='relu')(inputs)
            output = Dense(1, activation='sigmoid')(hidden)
            model = Model(input=inputs, output=output)

            saver = tf.train.Saver()
            summary_op = tf.merge_all_summaries()

        sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                                 logdir="./checkpoint/",
                                 # init_op=init_op,
                                 summary_op=summary_op,
                                 saver=saver,
                                 global_step=global_step,
                                 save_model_secs=60)
        with sv.managed_session(server.target) as sess:
            step = 0
            K.set_session(sess)
            model.compile(optimizer='sgd', loss='mse')
            while step < 1000000:
                train_x = np.random.randn(1)
                train_y = 2 * train_x + np.random.randn(1) * 0.33 + 10
                model.fit(train_x, train_y)
        sv.stop()

if __name__ == "__main__":
    tf.app.run()

then I run it with:

/opt/anaconda3/bin/python /cache/allenwoods/keras_dis_test.py --ps_hosts=0.0.0.0:48636 --worker_hosts=0.0.0.0:46261 --job_name=ps --task_index=0
/opt/anaconda3/bin/python /cache/allenwoods/keras_dis_test.py --ps_hosts=0.0.0.0:48636 --worker_hosts=0.0.0.0:46261 --job_name=worker --task_index=0

it doesn't work and return

Traceback (most recent call last):
  File "/cache/allenwoods/keras_dis_test.py", line 73, in <module>
    tf.app.run()
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/platform/app.py", line 30, in run
    sys.exit(main(sys.argv[:1] + flags_passthrough))
  File "/cache/allenwoods/keras_dis_test.py", line 69, in main
    model.fit(train_x, train_y)
  File "/opt/anaconda3/lib/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/training/supervisor.py", line 969, in managed_session
    self.stop(close_summary_writer=close_summary_writer)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/training/supervisor.py", line 797, in stop
    stop_grace_period_secs=self._stop_grace_secs)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/training/coordinator.py", line 386, in join
    six.reraise(*self._exc_info_to_raise)
  File "/opt/anaconda3/lib/python3.5/site-packages/six.py", line 686, in reraise
    raise value
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/training/supervisor.py", line 959, in managed_session
    yield sess
  File "/cache/allenwoods/VRLforTraffic/src/missions/keras_dis_test.py", line 65, in main
    model.compile(optimizer='sgd', loss='mse')
  File "/opt/anaconda3/lib/python3.5/site-packages/keras/engine/training.py", line 484, in compile
    self.optimizer = optimizers.get(optimizer)
  File "/opt/anaconda3/lib/python3.5/site-packages/keras/optimizers.py", line 580, in get
    instantiate=True, kwargs=kwargs)
  File "/opt/anaconda3/lib/python3.5/site-packages/keras/utils/generic_utils.py", line 18, in get_from_module
    return res()
  File "/opt/anaconda3/lib/python3.5/site-packages/keras/optimizers.py", line 134, in __init__
    self.iterations = K.variable(0.)
  File "/opt/anaconda3/lib/python3.5/site-packages/keras/backend/tensorflow_backend.py", line 149, in variable
    v = tf.Variable(value, dtype=_convert_string_dtype(dtype), name=name)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/ops/variables.py", line 215, in __init__
    dtype=dtype)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/ops/variables.py", line 327, in _init_from_args
    self._snapshot = array_ops.identity(self._variable, name="read")
  File "/opt/anaconda3/lib/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 4150, in name_scope
    yield scope
  File "/opt/anaconda3/lib/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 3645, in get_controller
    yield default
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 4150, in name_scope
    yield scope
  File "/opt/anaconda3/lib/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 2891, in name_scope
    yield "" if new_stack is None else new_stack + "/"
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 4150, in name_scope
    yield scope
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/ops/variables.py", line 293, in _init_from_args
    initial_value, name="initial_value", dtype=dtype)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 657, in convert_to_tensor
    ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/constant_op.py", line 180, in _constant_tensor_conversion_function
    return constant(v, dtype=dtype, name=name)
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/constant_op.py", line 167, in constant
    attrs={"value": tensor_value, "dtype": dtype_value}, name=name).outputs[0]
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 2339, in create_op
    self._check_not_finalized()
  File "/opt/anaconda3/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 2080, in _check_not_finalized
    raise RuntimeError("Graph is finalized and cannot be modified.")
RuntimeError: Graph is finalized and cannot be modified.

I wondering if it happens because keras' model wasn't created as part of the graph used in tf.train.Supervisor, but I have not a clue on how to prove it or fix it. Any idea?

@hbhuang
Copy link

hbhuang commented Oct 20, 2016

i have the same problem(Graph is finalized and cannot be modified.)can anyone help me?

@24suixinsuoyu
Copy link

I use the TensorFlow instead of keras, and I run into the same problem.

@ghost
Copy link

ghost commented Feb 24, 2017

i have the same problem(Graph is finalized and cannot be modified.)can anyone help me?

@stale stale bot added the stale label May 25, 2017
@stale
Copy link

stale bot commented May 25, 2017

This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 30 days if no further activity occurs, but feel free to re-open a closed issue if needed.

@stale stale bot closed this as completed Jun 25, 2017
@nmoezzi
Copy link

nmoezzi commented Aug 31, 2017

Did any one could make this code work? or is it even possible to make it work this way?

@PBehr
Copy link

PBehr commented Sep 20, 2017

The graph get's finalized by calling tf.train.Supervisor. We need to compile and call _make_train_function and if you need also make_test_function and make_predict_function bevor calling the supervisor.
We also need to set K.manual_variable_initialization(True)

I updated the code above, it should run now.

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Created by Enigma on 2016/9/26

import numpy as np
import tensorflow as tf

# Define Hyperparameters
FLAGS = tf.app.flags.FLAGS

# For missions
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

# Hyperparameters

from keras import backend as K
from keras.layers import Input, Dense
from keras.models import Model


def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

    server_config = tf.ConfigProto(
        gpu_options=tf.GPUOptions(allow_growth=True),
        log_device_placement=True)
    server = tf.train.Server(cluster, config=server_config,
                             job_name=FLAGS.job_name, task_index=FLAGS.task_index)

    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":
        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d/cpu:0" % FLAGS.task_index,
                cluster=cluster)):
            global_step = tf.Variable(0, name='global_step', trainable=False)
            inputs = Input(shape=[1, ])
            hidden = Dense(10, activation='relu')(inputs)
            output = Dense(1, activation='sigmoid')(hidden)
            model = Model(input=inputs, output=output)


            saver = tf.train.Saver()

            model.compile(optimizer='sgd', loss='mse')
            model._make_train_function()
            model._make_test_function()

        sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                                 logdir="./checkpoint/",
                                 saver=saver,
                                 global_step=global_step,
                                 save_model_secs=60)
        with sv.managed_session(server.target) as sess:
            step = 0
            K.set_session(sess)
            K.manual_variable_initialization(True)

            while step < 1000000:
                train_x = np.random.randn(1)
                train_y = 2 * train_x + np.random.randn(1) * 0.33 + 10
                model.fit(train_x, train_y)
        sv.stop()

if __name__ == "__main__":
    tf.app.run()

@mattdornfeld
Copy link

@PBehr This is awesome! Any ideas how to increment global step?

@mas-dse-greina
Copy link

I'm getting an error (below) from your code which I think has to do with the feed_dict. Any idea how to solve this? Did you run into this with your code?

Traceback (most recent call last):
File "keras_dist.py", line 97, in
tf.app.run()
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/platform/app.py", line 48, in run
_sys.exit(main(_sys.argv[:1] + flags_passthrough))
File "keras_dist.py", line 86, in main
with sv.managed_session(server.target) as sess:
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/contextlib.py", line 17, in enter
return self.gen.next()
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 964, in managed_session
self.stop(close_summary_writer=close_summary_writer)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 792, in stop
stop_grace_period_secs=self._stop_grace_secs)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/training/coordinator.py", line 389, in join
six.reraise(*self._exc_info_to_raise)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 953, in managed_session
start_standard_services=start_standard_services)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 708, in prepare_or_wait_for_session
init_feed_dict=self._init_feed_dict, init_fn=self._init_fn)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/training/session_manager.py", line 279, in prepare_session
sess.run(init_op, feed_dict=init_feed_dict)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 889, in run
run_metadata_ptr)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1120, in _run
feed_dict_tensor, options, run_metadata)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1317, in _do_run
options, run_metadata)
File "/home/bduser/miniconda2/envs/tf/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1336, in _do_call
raise type(e)(node_def, op, message)
tensorflow.python.framework.errors_impl.UnavailableError: Endpoint read failed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants