Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-worker training with Keras only use one GPU #1169

Closed
LearnKen opened this issue Jun 8, 2020 · 1 comment
Closed

Multi-worker training with Keras only use one GPU #1169

LearnKen opened this issue Jun 8, 2020 · 1 comment

Comments

@LearnKen
Copy link

LearnKen commented Jun 8, 2020

System information

Ubuntu 16.04
1Master   IP:14X.XXX.XXX.1
node1     IP:14X.XXX.XXX.8     GTX1060
node2     IP:14X.XXX.XXX.9     GTX1060
node3     IP:14X.XXX.XXX.10    GTX1070
docker18.09.7-3
cuda 10.0
nvidia-container-runtime=2.0.0
kubernetes 1.5.7
kubeflow 1.01

Describe the current behavior
I use Multi-worker training with Keras but it only use one Gpu
Error:
1.error: Internal: Complete shape not known for Adam/allreduce/CollectiveReduce_2
2.eval_fn is not passed in. The worker_fn will be used if an "evaluator" task exists in the cluster
3.Most import it only run one gpu
I run the code described below:

TEST 1: (3 machine)

image

TEST 2 : (2 machine)
image

Describe the expected behavior

Use Multi gpu

GPU device

$ kubectl get nodes "-o=custom-columns=NAME:.metadata.name,GPU:.status.allocatable.nvidia\.com/gpu"

image

My Docker File

FROM tensorflow/tensorflow:2.1.0-gpu-py3
RUN apt-get update
RUN apt-get install -y libsm6 libxext6 libxrender-dev
RUN pip install opencv-python
RUN pip install Pillow
RUN mkdir -p /app
ADD tp720_1.py /app/
COPY nspo /app/

My yaml

apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
  name: nspo-rice
spec:
  tfReplicaSpecs:
    Chief:
      replicas: 1
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
          name: tensorflow
        spec:
          containers:
          - command:
            - python
            - tp720_1.py
            image: nsporice:270_7
            name: tensorflow
            env:
            - name: test_tmpdir
              value: /app/data
            resourceas:
              limits:
                cpu: '1'
            volumeMounts:
            - mountPath: /app/data
              name: nspo-rice-volume
            workingDir: /app
          restartPolicy: Never
          volumes:
          - name: nspo-rice-volume
            persistentVolumeClaim:
              claimName: nspo-rice-volume
    Worker:
      replicas: 2
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
          name: tensorflow
        spec:
          containers:
          - command:
            - python
            - tp720_1.py
            image: nsporice:270_7
            name: tensorflow
            env:
            - name: test_tmpdir
              value: /app/data
            resourceas:
              limits:
                nvidia.com/gpu: 1
            volumeMounts:
            - mountPath: /app/data
              name: nspo-rice-volume
            workingDir: /app
          restartPolicy: Never
          volumes:
          - name: nspo-rice-volume
            persistentVolumeClaim:
              claimName: nspo-rice-volume

My code

from os import walk
from os.path import join
import numpy as np
import matplotlib.pyplot as plt
import cv2
import time
import random
from scipy import signal
import os
import json
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, GRU, LSTM, TimeDistributed, RepeatVector, Bidirectional
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import tensorflow.keras.backend as K
from tensorflow.keras.models import load_model
class TimeHistory(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.losses = {'batch':[], 'epoch':[]}
        self.accuracy = {'batch':[], 'epoch':[]}
        self.val_loss = {'batch':[], 'epoch':[]}
        self.val_acc = {'batch':[], 'epoch':[]}
        self.times = []
        self.totaltime = time.time()
        
    def on_train_end(self, logs={}):
        self.totaltime = time.time() - self.totaltime
    
    def on_batch_end(self, batch, logs={}):
        self.losses['batch'].append(logs.get('loss'))
        self.accuracy['batch'].append(logs.get('acc'))
        self.val_loss['batch'].append(logs.get('val_loss'))
        self.val_acc['batch'].append(logs.get('val_acc'))

    def on_epoch_begin(self, batch, logs={}):
        self.epoch_time_start = time.time()
        
    def on_epoch_end(self, batch, logs={}):
        self.losses['epoch'].append(logs.get('loss'))
        
        self.accuracy['epoch'].append(logs.get('acc'))
        self.val_loss['epoch'].append(logs.get('val_loss'))
        self.val_acc['epoch'].append(logs.get('val_acc'))
        self.times.append(time.time() - self.epoch_time_start)
   
    def loss_plot(self, loss_type):
        acc =[]
        loss = []
        val = []
        iters = range(len(self.losses[loss_type]))
        # acc
        acc.extend(self.accuracy[loss_type])
        # loss
        print("loss = ",self.losses[loss_type])
        loss.extend(self.losses[loss_type])
        # val
        print("val = ",self.losses[loss_type])
        val.extend(self.val_acc[loss_type])
        return(acc, loss,val)
time_callback = TimeHistory()
      
def buildManyToOneModel(shape):

    model = tf.keras.models.Sequential([
        GRU(32, input_dim = shape[2], input_length = shape[1], return_sequences = True),
        GRU(64, return_sequences = True),
        GRU(128, return_sequences = False),
        #LSTM(16, return_sequences = True),
        #LSTM(16, return_sequences = False),
        BatchNormalization(),
        Dense(1, activation='sigmoid')
    ])
    model.compile(loss='mse', optimizer = 'adam', metrics=['acc'])
    model.summary()
    return model

def slice_(data, node_num):
    total = float(data.shape[0])
    store = []
    if node_num == 1:
        store.append(data[0:int(total),:])
        store.append([0])
        store.append([0])
    elif node_num == 2:
        slice_index = int(total / 2)
        store.append(data[0:slice_index, :])
        store.append(data[slice_index:int(total), :])
        store.append([0])
    elif node_num == 3:
        slice_index = int(total / 3)
        store.append(data[0:slice_index, :])
        store.append(data[slice_index:2*slice_index, :])
        store.append(data[2*slice_index:int(total), :])
    return store

def train():
    print("TensorFlow version: ", tf.__version__)
    tf_config = os.environ.get('TF_CONFIG', '{}')
    print("TF_CONFIG %s", tf_config)
    tf_config_json = json.loads(tf_config)
    cluster = tf_config_json.get('cluster')
    job_name = tf_config_json.get('task', {}).get('type')
    task_index = tf_config_json.get('task', {}).get('index')

    print("cluster={} job_name={} task_index={}}", cluster, job_name, task_index)
    
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
        except RuntimeError as e:
            print(e)
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(communication=tf.distribute.experimental.CollectiveCommunication.RING)
    print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))

    data0 = np.load('/app/data/2017_360w_data_0.npy')
    data1 = np.load('/app/data/2017_360w_data_1.npy')
    data0 = data0[:,:,0]
    data1 = data1[:,:,0]
    print("data0:",data0.shape)
    print("data1:",data1.shape)
    a1 = np.array(data0)[0 : int(data0.shape[0]*0.7), :]
    a2 = np.array(data1)[0 : int(data1.shape[0]*0.7), :]
    a3 = np.array(data0)[int(data0.shape[0]*0.7) : data0.shape[0], :]
    a4 = np.array(data1)[int(data1.shape[0]*0.7) : data1.shape[0], :]
    X_train = np.concatenate((a1, a2), axis=0) 
    X_val = np.concatenate((a3, a4), axis=0) 

    b1 = np.zeros((a1.shape[0], 1))
    b2 = np.ones((a2.shape[0], 1))

    b3 = np.zeros((a3.shape[0], 1))

    b4 = np.ones((a4.shape[0], 1))

    Y_train = np.concatenate((b1, b2), axis=0)
    Y_val = np.concatenate((b3, b4), axis=0)
    X_train = X_train.astype(np.float32)
    Y_train = Y_train.astype(np.float32)
    X_val = X_val.astype(np.float32)
    Y_val = Y_val.astype(np.float32)
    X_train = X_train[:,:,np.newaxis]

    X_val = X_val[:,:,np.newaxis]
    print(X_train.shape)
    print(Y_train.shape)
    print(X_val.shape)
    print(Y_val.shape)
    print(type(X_train)) 
    print(type(Y_train))    
    BUFFER_SIZE = X_train.shape[0]
 
    BATCH_SIZE_PER_REPLICA = 5000
    GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA  * (strategy.num_replicas_in_sync-1)
    
    #train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
    #test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset) 
   
    if BUFFER_SIZE % GLOBAL_BATCH_SIZE != 0:
        parallel_steps =  X_train.shape[0] // GLOBAL_BATCH_SIZE + 1
        a =  X_val.shape[0] // GLOBAL_BATCH_SIZE + 1
    else:
        parallel_steps =  X_train.shape[0] // GLOBAL_BATCH_SIZE
        a =  X_val.shape[0] // GLOBAL_BATCH_SIZE         
    print(parallel_steps) 
    t2 = time.time()
    with strategy.scope():
         train_dataset = tf.data.Dataset.from_tensor_slices((X_train, Y_train)).repeat().shuffle(buffer_size=5000000).batch(GLOBAL_BATCH_SIZE)
         #test_dataset = tf.data.Dataset.from_tensor_slices((X_val, Y_val)).batch(GLOBAL_BATCH_SIZE) 
         options = tf.data.Options()
         options.experimental_distribute.auto_shard_policy = \
                                        tf.data.experimental.AutoShardPolicy.DATA
         train_dataset = train_dataset.with_options(options)    
         #test_dataset = test_dataset.with_options(options)        
         multi_worker_model = buildManyToOneModel(X_train.shape)    

    #history = multi_worker_model.fit(train_dataset, epochs=30, validation_data=test_dataset,steps_per_epoch=parallel_steps,validation_steps=a,callbacks=[time_callback])
    history = multi_worker_model.fit(train_dataset, epochs=30,steps_per_epoch=parallel_steps,callbacks=[time_callback])
    t3 = time.time()    
    print ("It cost ", t3 - t2, " seconds")


    accuracy, loss,val = time_callback.loss_plot('epoch')
    print("totaltime:%.4f"%time_callback.totaltime)
    for i in range(len(accuracy)):
        print("acc: %.4f, loss: %.4f,val:%.4f -----epoch:%d" %(accuracy[i], loss[i],val[i],i+1))
    totaltime='%.2f'%time_callback.totaltime
    ttime= t3 - t2
    traningtime='%.2f'%ttime
    maxval='%.2f'%max(val)
    save_dir = '/app/data'
    f = open(save_dir+"/720_1.txt", "a")
    f.write("\ntotaltime:{},traningtime:{},val:{}".format(totaltime,traningtime,maxval))
    f.close()
if __name__ == '__main__':
    train()  

Pod logs

2020-06-08 12:52:04.146322: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libnvinfer.so.6
2020-06-08 12:52:04.147962: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libnvinfer_plugin.so.6
2020-06-08 12:52:04.788595: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcuda.so.1
2020-06-08 12:52:04.795356: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1555] Found device 0 with properties: 
pciBusID: 0000:05:00.0 name: GeForce GTX 1070 computeCapability: 6.1
coreClock: 1.7715GHz coreCount: 15 deviceMemorySize: 7.93GiB deviceMemoryBandwidth: 238.66GiB/s
2020-06-08 12:52:04.795467: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-06-08 12:52:04.795550: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-06-08 12:52:04.798357: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcufft.so.10
2020-06-08 12:52:04.799032: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcurand.so.10
2020-06-08 12:52:04.802641: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusolver.so.10
2020-06-08 12:52:04.804156: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusparse.so.10
2020-06-08 12:52:04.804221: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2020-06-08 12:52:04.805601: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1697] Adding visible gpu devices: 0
2020-06-08 12:52:04.806525: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2020-06-08 12:52:04.814745: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2198720000 Hz
2020-06-08 12:52:04.815811: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x5c54090 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-06-08 12:52:04.815832: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-06-08 12:52:04.933082: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x5cb9890 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2020-06-08 12:52:04.933126: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): GeForce GTX 1070, Compute Capability 6.1
2020-06-08 12:52:04.934468: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1555] Found device 0 with properties: 
pciBusID: 0000:05:00.0 name: GeForce GTX 1070 computeCapability: 6.1
coreClock: 1.7715GHz coreCount: 15 deviceMemorySize: 7.93GiB deviceMemoryBandwidth: 238.66GiB/s
2020-06-08 12:52:04.934575: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-06-08 12:52:04.934620: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-06-08 12:52:04.934658: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcufft.so.10
2020-06-08 12:52:04.934694: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcurand.so.10
2020-06-08 12:52:04.934746: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusolver.so.10
2020-06-08 12:52:04.934794: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusparse.so.10
2020-06-08 12:52:04.934850: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2020-06-08 12:52:04.938735: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1697] Adding visible gpu devices: 0
2020-06-08 12:52:04.938852: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-06-08 12:52:05.301356: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1096] Device interconnect StreamExecutor with strength 1 edge matrix:
2020-06-08 12:52:05.301383: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1102]      0 
2020-06-08 12:52:05.301391: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1115] 0:   N 
2020-06-08 12:52:05.302813: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1241] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 6927 MB memory) -> physical GPU (device: 0, name: GeForce GTX 1070, pci bus id: 0000:05:00.0, compute capability: 6.1)
2020-06-08 12:52:05.305432: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1555] Found device 0 with properties: 
pciBusID: 0000:05:00.0 name: GeForce GTX 1070 computeCapability: 6.1
coreClock: 1.7715GHz coreCount: 15 deviceMemorySize: 7.93GiB deviceMemoryBandwidth: 238.66GiB/s
2020-06-08 12:52:05.305510: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-06-08 12:52:05.305552: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-06-08 12:52:05.305590: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcufft.so.10
2020-06-08 12:52:05.305617: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcurand.so.10
2020-06-08 12:52:05.305649: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusolver.so.10
2020-06-08 12:52:05.305686: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusparse.so.10
2020-06-08 12:52:05.305721: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2020-06-08 12:52:05.306950: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1697] Adding visible gpu devices: 0
2020-06-08 12:52:05.306975: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1096] Device interconnect StreamExecutor with strength 1 edge matrix:
2020-06-08 12:52:05.306983: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1102]      0 
2020-06-08 12:52:05.306988: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1115] 0:   N 
2020-06-08 12:52:05.308211: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1241] Created TensorFlow device (/job:worker/replica:0/task:1/device:GPU:0 with 6927 MB memory) -> physical GPU (device: 0, name: GeForce GTX 1070, pci bus id: 0000:05:00.0, compute capability: 6.1)
2020-06-08 12:52:05.314693: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:300] Initialize GrpcChannelCache for job chief -> {0 -> nspo-rice-chief-0.kubeflow.svc:2222}
2020-06-08 12:52:05.314716: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:300] Initialize GrpcChannelCache for job worker -> {0 -> nspo-rice-worker-0.kubeflow.svc:2222, 1 -> localhost:2222}
2020-06-08 12:52:05.315663: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:390] Started server with target: grpc://localhost:2222
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
2020-06-08 12:52:16.277123: W tensorflow/core/grappler/optimizers/scoped_allocator_optimizer.cc:440] error: Internal: Complete shape not known for Adam/allreduce/CollectiveReduce_2
2020-06-08 12:52:16.277150: W tensorflow/core/grappler/optimizers/scoped_allocator_optimizer.cc:1056] error: Internal: Complete shape not known for Adam/allreduce/CollectiveReduce_2
2020-06-08 12:52:16.277227: E tensorflow/core/grappler/optimizers/scoped_allocator_optimizer.cc:1073] ScopedAllocatorOptimizer: Internal: Complete shape not known for Adam/allreduce/CollectiveReduce_2
2020-06-08 12:52:16.277234: W tensorflow/core/grappler/optimizers/scoped_allocator_optimizer.cc:846] error: Internal: Complete shape not known for Adam/allreduce/CollectiveReduce_2
2020-06-08 12:52:16.277886: E tensorflow/core/grappler/optimizers/meta_optimizer.cc:561] scoped_allocator_optimizer failed: Internal: Complete shape not known for Adam/allreduce/CollectiveReduce_2
2020-06-08 12:52:16.672402: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
TensorFlow version:  2.1.0
TF_CONFIG %s {"cluster":{"chief":["nspo-rice-chief-0.kubeflow.svc:2222"],"worker":["nspo-rice-worker-0.kubeflow.svc:2222","nspo-rice-worker-1.kubeflow.svc:2222"]},"task":{"type":"worker","index":1},"environment":"cloud"}
cluster={} job_name={} task_index={}} {'chief': ['nspo-rice-chief-0.kubeflow.svc:2222'], 'worker': ['nspo-rice-worker-0.kubeflow.svc:2222', 'nspo-rice-worker-1.kubeflow.svc:2222']} worker 1
Number of devices: 3
data0: (1760157, 14)
data1: (1839843, 14)
(2519999, 14, 1)
(2519999, 1)
(1080001, 14, 1)
(1080001, 1)
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
252
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
gru (GRU)                    (None, 14, 32)            3360      
_________________________________________________________________
gru_1 (GRU)                  (None, 14, 64)            18816     
_________________________________________________________________
gru_2 (GRU)                  (None, 128)               74496     
_________________________________________________________________
batch_normalization (BatchNo (None, 128)               512       
_________________________________________________________________
dense (Dense)                (None, 1)                 129       
=================================================================
Total params: 97,313
Trainable params: 97,057
Non-trainable params: 256
_________________________________________________________________
Train for 252 steps
Epoch 1/30
252/252 [==============================] - 53s 209ms/step - loss: 0.1431 - acc: 0.8200
Epoch 2/30
252/252 [==============================] - 38s 149ms/step - loss: 0.0953 - acc: 0.8733
Epoch 3/30
252/252 [==============================] - 38s 149ms/step - loss: 0.0890 - acc: 0.8798
@issue-label-bot
Copy link

Issue-Label Bot is automatically applying the labels:

Label Probability
area/tfjob 0.79

Please mark this comment with 👍 or 👎 to give our bot feedback!
Links: app homepage, dashboard and code for this bot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant