From b73ed9780a1a21c5a1f2068bc6cb017857ee6f49 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 8 Dec 2015 12:09:59 -0800 Subject: [PATCH] multi GPU support on one node --- caffe/include/caffe/parallel.hpp | 3 +- .../bvlc_googlenet/quick_solver.prototxt | 2 - .../models/bvlc_googlenet/train_val.prototxt | 26 +------- caffe/src/caffe/parallel.cpp | 11 ++-- caffe/src/caffe/util/io.cpp | 2 +- cmake/Modules/FindLMDB.cmake | 24 +++++++ ec2/create_labelfile.py | 8 +-- ec2/pull.py | 30 ++++----- libccaffe/CMakeLists.txt | 6 ++ libccaffe/ccaffe.cpp | 60 +++++++++++++++--- libccaffe/ccaffe.h | 3 +- src/main/java/libs/CaffeLibrary.java | 3 +- src/main/scala/apps/MultiGPUApp.scala | 62 +++++++++++++++++++ src/main/scala/libs/Net.scala | 8 ++- 14 files changed, 183 insertions(+), 65 deletions(-) create mode 100644 cmake/Modules/FindLMDB.cmake create mode 100644 src/main/scala/apps/MultiGPUApp.scala diff --git a/caffe/include/caffe/parallel.hpp b/caffe/include/caffe/parallel.hpp index 85fc2b5..a4d14a5 100644 --- a/caffe/include/caffe/parallel.hpp +++ b/caffe/include/caffe/parallel.hpp @@ -93,7 +93,8 @@ class P2PSync : public GPUParams, public Solver::Callback, return solver_; } - void run(const vector& gpus); + vector > > initialize(const vector& gpus); + void step(vector > > , int num_steps); protected: void on_start(); diff --git a/caffe/models/bvlc_googlenet/quick_solver.prototxt b/caffe/models/bvlc_googlenet/quick_solver.prototxt index 5d2f7ee..addc520 100644 --- a/caffe/models/bvlc_googlenet/quick_solver.prototxt +++ b/caffe/models/bvlc_googlenet/quick_solver.prototxt @@ -1,6 +1,4 @@ net: "models/bvlc_googlenet/train_val.prototxt" -test_iter: 1000 -test_interval: 4000 test_initialization: false display: 40 average_loss: 40 diff --git a/caffe/models/bvlc_googlenet/train_val.prototxt b/caffe/models/bvlc_googlenet/train_val.prototxt index 5dee3ab..0c67d52 100644 --- a/caffe/models/bvlc_googlenet/train_val.prototxt +++ b/caffe/models/bvlc_googlenet/train_val.prototxt @@ -4,9 +4,6 @@ layer { type: "Data" top: "data" top: "label" - include { - phase: TRAIN - } transform_param { mirror: true crop_size: 224 @@ -15,32 +12,11 @@ layer { mean_value: 123 } data_param { - source: "examples/imagenet/ilsvrc12_train_lmdb" + source: "/imgnet/traindb" batch_size: 32 backend: LMDB } } -layer { - name: "data" - type: "Data" - top: "data" - top: "label" - include { - phase: TEST - } - transform_param { - mirror: false - crop_size: 224 - mean_value: 104 - mean_value: 117 - mean_value: 123 - } - data_param { - source: "examples/imagenet/ilsvrc12_val_lmdb" - batch_size: 50 - backend: LMDB - } -} layer { name: "conv1/7x7_s2" type: "Convolution" diff --git a/caffe/src/caffe/parallel.cpp b/caffe/src/caffe/parallel.cpp index 62f5d73..5720801 100644 --- a/caffe/src/caffe/parallel.cpp +++ b/caffe/src/caffe/parallel.cpp @@ -380,7 +380,7 @@ void P2PSync::on_gradients_ready() { } template -void P2PSync::run(const vector& gpus) { +vector > > P2PSync::initialize(const vector& gpus) { // Pair devices for map-reduce synchronization vector pairs; DevicePair::compute(gpus, &pairs); @@ -418,13 +418,16 @@ void P2PSync::run(const vector& gpus) { LOG(INFO)<< "Starting Optimization"; + return syncs; +} + +template +void P2PSync::step(vector > > syncs, int num_steps) { for (int i = 1; i < syncs.size(); ++i) { syncs[i]->StartInternalThread(); } - // Run root solver on current thread - solver_->Solve(); - + solver_->Step(num_steps); for (int i = 1; i < syncs.size(); ++i) { syncs[i]->StopInternalThread(); } diff --git a/caffe/src/caffe/util/io.cpp b/caffe/src/caffe/util/io.cpp index f2b1dd9..835d2d4 100644 --- a/caffe/src/caffe/util/io.cpp +++ b/caffe/src/caffe/util/io.cpp @@ -2,8 +2,8 @@ #include #include #include -#include #ifdef USE_OPENCV +#include #include #include #include diff --git a/cmake/Modules/FindLMDB.cmake b/cmake/Modules/FindLMDB.cmake new file mode 100644 index 0000000..5a9e819 --- /dev/null +++ b/cmake/Modules/FindLMDB.cmake @@ -0,0 +1,24 @@ +# Try to find the LMBD libraries and headers +# LMDB_FOUND - system has LMDB lib +# LMDB_INCLUDE_DIR - the LMDB include directory +# LMDB_LIBRARIES - Libraries needed to use LMDB + +# FindCWD based on FindGMP by: +# Copyright (c) 2006, Laurent Montel, +# +# Redistribution and use is allowed according to the terms of the BSD license. + +# Adapted from FindCWD by: +# Copyright 2013 Conrad Steenberg +# Aug 31, 2013 + +find_path(LMDB_INCLUDE_DIR NAMES lmdb.h PATHS "$ENV{LMDB_DIR}/include") +find_library(LMDB_LIBRARIES NAMES lmdb PATHS "$ENV{LMDB_DIR}/lib" ) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(LMDB DEFAULT_MSG LMDB_INCLUDE_DIR LMDB_LIBRARIES) + +if(LMDB_FOUND) + message(STATUS "Found lmdb (include: ${LMDB_INCLUDE_DIR}, library: ${LMDB_LIBRARIES})") + mark_as_advanced(LMDB_INCLUDE_DIR LMDB_LIBRARIES) +endif() diff --git a/ec2/create_labelfile.py b/ec2/create_labelfile.py index 729d9d8..fd55af0 100644 --- a/ec2/create_labelfile.py +++ b/ec2/create_labelfile.py @@ -12,12 +12,12 @@ trainfile = open(args.trainfile, 'r') for line in trainfile.readlines(): - (fname, label) = line.split() - labelmap[fname.upper()] = label # poor man's file name normalization + (fname, label) = line.split() + labelmap[fname.upper()] = label # poor man's file name normalization trainfile.close() outfile = open(args.outfile, 'w') for root, dirs, files in os.walk(args.directory): - for f in files: - outfile.write(f + " " + labelmap[f.upper()] + "\n") + for f in files: + outfile.write(f + " " + labelmap[f.upper()] + "\n") outfile.close() diff --git a/ec2/pull.py b/ec2/pull.py index 8e1e259..616cc90 100644 --- a/ec2/pull.py +++ b/ec2/pull.py @@ -20,25 +20,25 @@ args = parser.parse_args() def download_files(directory, tar_path): - response = s3.get_object(Bucket='sparknet', Key=tar_path) + response = s3.get_object(Bucket='sparknet', Key=tar_path) - output = io.BytesIO() + output = io.BytesIO() - chunk = response['Body'].read(1024 * 8) - while chunk: - output.write(chunk) - chunk = response['Body'].read(1024 * 8) + chunk = response['Body'].read(1024 * 8) + while chunk: + output.write(chunk) + chunk = response['Body'].read(1024 * 8) - output.seek(0) # go to the beginning of the .tar file + output.seek(0) # go to the beginning of the .tar file - tar = tarfile.open(mode= "r", fileobj=output) + tar = tarfile.open(mode= "r", fileobj=output) - for member in tar.getmembers(): - filename = member.path # in a format like 'n02099601_3085.JPEG' - content = tar.extractfile(member) - out = open(os.path.join(directory, filename), 'w') - out.write(content.read()) - out.close() + for member in tar.getmembers(): + filename = member.path # in a format like 'n02099601_3085.JPEG' + content = tar.extractfile(member) + out = open(os.path.join(directory, filename), 'w') + out.write(content.read()) + out.close() directory = os.path.join(args.directory, '%03d-%03d' % (args.start_idx, args.stop_idx)) @@ -46,4 +46,4 @@ def download_files(directory, tar_path): os.makedirs(directory) for idx in range(args.start_idx, args.stop_idx): - download_files(directory, 'ILSVRC2012_train/files-shuf-%03d.tar' % idx) + download_files(directory, 'ILSVRC2012_train/files-shuf-%03d.tar' % idx) diff --git a/libccaffe/CMakeLists.txt b/libccaffe/CMakeLists.txt index 6d5a5b9..4147296 100644 --- a/libccaffe/CMakeLists.txt +++ b/libccaffe/CMakeLists.txt @@ -53,6 +53,12 @@ include_directories(${PROJECT_BINARY_DIR}/../caffe/include/) # for gtest include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../caffe/src/) +# for lmdb +add_definitions(-DUSE_LMDB) +find_package(LMDB REQUIRED) +include_directories(SYSTEM ${LMDB_INCLUDE_DIRS}) +list(APPEND Caffe_LINKER_LIBS ${LMDB_LIBRARIES}) + ## The files file(GLOB Source GLOB "${CMAKE_CURRENT_SOURCE_DIR}/../caffe/src/caffe/*.cpp") diff --git a/libccaffe/ccaffe.cpp b/libccaffe/ccaffe.cpp index 7d37a22..6814122 100644 --- a/libccaffe/ccaffe.cpp +++ b/libccaffe/ccaffe.cpp @@ -9,6 +9,7 @@ #include #include #include +#include "caffe/parallel.hpp" #include @@ -17,13 +18,21 @@ using boost::shared_ptr; using google::protobuf::Message; +// a caffenet_state supports working with either (a) a single network +// (state->net), (b) a testing and a training network coupled by a solver +// (state->solver, state->net, state->testnet) or (c) a solver that supports +// mutiple GPUs on one machine (state->sync, state->solver, state->net, +// state->testnet) + struct caffenet_state { caffe::Net *net; // holds a net, can be used independently of solver and testnet - caffe::Solver *solver; // holds a solver + shared_ptr > solver; // holds a solver + caffe::P2PSync* sync; // GPU manager needed for multi GPUs caffe::Net *testnet; // reference to the the first (and only) testnet of the solver std::vector *test_score; // scratch space to store test scores Message* proto; // scratch space to store protobuf message std::vector* buffer; // scratch space to pass serialized protobuf to client + std::vector > > syncs; // vector of GPU managers, needed for multi GPU }; void init_logging(const char* log_filename, int log_verbosity) { @@ -48,7 +57,8 @@ caffenet_state* create_state() { caffenet_state *state = new caffenet_state(); state->net = NULL; state->testnet = NULL; - state->solver = NULL; + state->solver = shared_ptr >(); + state->sync = NULL; state->test_score = new std::vector(); state->proto = NULL; state->buffer = new std::vector(); @@ -56,9 +66,7 @@ caffenet_state* create_state() { } void destroy_state(caffenet_state* state) { - if (state->solver != NULL) { - delete state->solver; - } else if (state->net != NULL) { + if (state->net != NULL && !state->solver) { delete state->net; } if (state->proto != NULL) { @@ -69,12 +77,34 @@ void destroy_state(caffenet_state* state) { delete state; } -void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len) { +void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len, int num_gpus) { caffe::SolverParameter param; param.ParseFromString(std::string(solver_param, solver_param_len)); - state->solver = new caffe::SGDSolver(param); + + std::vector gpus; + for (int i = 0; i < num_gpus; i++) { + gpus.push_back(i); + } + + if (num_gpus > 1) { + param.set_device_id(gpus[0]); + caffe::Caffe::SetDevice(gpus[0]); + caffe::Caffe::set_mode(caffe::Caffe::GPU); + caffe::Caffe::set_solver_count(gpus.size()); + } + + state->solver = shared_ptr >(caffe::SolverRegistry::CreateSolver(param)); + state->net = state->solver->net().get(); - state->testnet = state->solver->test_nets()[0].get(); + + if(param.test_iter_size() > 0) { + state->testnet = state->solver->test_nets()[0].get(); + } + + if (num_gpus > 1) { + state->sync = new caffe::P2PSync(state->solver, NULL, state->solver->param()); + state->syncs = state->sync->initialize(gpus); + } } void load_net_from_protobuf(caffenet_state* state, const char* net_param, int net_param_len) { @@ -172,10 +202,16 @@ void backward(caffenet_state* state) { } int solver_step(caffenet_state* state, int step) { - state->solver->Step(step); + CHECK(caffe::Caffe::root_solver()); + if (state->sync == NULL) { + state->solver->Step(step); + } else { + state->sync->step(state->syncs, step); + } return 0; // success } +// TODO: Make sure that there is an error message if test is called without testnet void solver_test(caffenet_state* state, int num_steps) { state->test_score->clear(); state->solver->TestAndStoreResult(0, num_steps, state->test_score); @@ -206,6 +242,12 @@ void load_weights_from_file(caffenet_state* state, const char* filename) { state->net->CopyTrainedLayersFrom(filename); } +void save_weights_to_file(caffenet_state* state, const char* filename) { + caffe::NetParameter net_param; + state->net->ToProto(&net_param, state->solver->param().snapshot_diff()); + WriteProtoToBinaryFile(net_param, filename); +} + void restore_solver_from_file(caffenet_state* state, const char* filename) { state->solver->Restore(filename); } diff --git a/libccaffe/ccaffe.h b/libccaffe/ccaffe.h index d2261fd..7eabe69 100644 --- a/libccaffe/ccaffe.h +++ b/libccaffe/ccaffe.h @@ -20,7 +20,7 @@ extern "C" { caffenet_state* create_state(); void destroy_state(caffenet_state* state); - void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len); + void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len, int num_gpus); void load_net_from_protobuf(caffenet_state* state, const char* net_param, int net_param_len); // TODO: Write documentation for these methods (in particular the layer index) @@ -59,6 +59,7 @@ extern "C" { void set_device(int gpu_id); void load_weights_from_file(caffenet_state* state, const char* filename); + void save_weights_to_file(caffenet_state* state, const char* filename); void restore_solver_from_file(caffenet_state* state, const char* filename); bool parse_net_prototxt(caffenet_state* state, const char* filename); diff --git a/src/main/java/libs/CaffeLibrary.java b/src/main/java/libs/CaffeLibrary.java index 8634eae..97b6014 100644 --- a/src/main/java/libs/CaffeLibrary.java +++ b/src/main/java/libs/CaffeLibrary.java @@ -23,7 +23,7 @@ interface java_callback_t extends Callback { void destroy_state(Pointer state); void load_net_from_protobuf(Pointer state, Pointer net_param, int net_param_len); - void load_solver_from_protobuf(Pointer state, Pointer solver_param, int solver_param_len); + void load_solver_from_protobuf(Pointer state, Pointer solver_param, int solver_param_len, int num_gpus); int set_train_data_callback(Pointer state, int layer_idx, java_callback_t callback); int set_test_data_callback(Pointer state, int layer_idx, java_callback_t callback); @@ -57,6 +57,7 @@ interface java_callback_t extends Callback { void set_device(int gpu_id); void load_weights_from_file(Pointer state, String filename); + void save_weights_to_file(Pointer state, String filename); void restore_solver_from_file(Pointer state, String filename); boolean parse_net_prototxt(Pointer state, String filename); diff --git a/src/main/scala/apps/MultiGPUApp.scala b/src/main/scala/apps/MultiGPUApp.scala new file mode 100644 index 0000000..98b825e --- /dev/null +++ b/src/main/scala/apps/MultiGPUApp.scala @@ -0,0 +1,62 @@ +package apps + +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel + +import libs._ +import loaders._ +import preprocessing._ + +object MultiGPUApp { + // initialize nets on workers + val sparkNetHome = "/root/SparkNet" + System.load(sparkNetHome + "/build/libccaffe.so") + val caffeLib = CaffeLibrary.INSTANCE + caffeLib.set_basepath(sparkNetHome + "/caffe/") + var netParameter = ProtoLoader.loadNetPrototxt(sparkNetHome + "/caffe/models/bvlc_googlenet/train_val.prototxt") + val solverParameter = ProtoLoader.loadSolverPrototxtWithNet(sparkNetHome + "/caffe/models/bvlc_googlenet/quick_solver.prototxt", netParameter, None) + val net = CaffeNet(solverParameter, 4) // TODO: Fix 1 not working here + + def main(args: Array[String]) { + val numWorkers = args(0).toInt + val conf = new SparkConf() + .setAppName("ImageNet") + .set("spark.driver.maxResultSize", "30G") + .set("spark.task.maxFailures", "1") + .set("spark.eventLog.enabled", "true") + val sc = new SparkContext(conf) + + var netWeights = net.getWeights() + val workers = sc.parallelize(Array.range(0, numWorkers), numWorkers) + + val caffeLib = CaffeLibrary.INSTANCE + val state = net.getState() + + var i = 0 + while (true) { + val broadcastWeights = sc.broadcast(netWeights) + + workers.foreach(_ => net.setWeights(broadcastWeights.value)) + + val syncInterval = 50 + workers.foreachPartition( + _ => { + net.train(syncInterval) + () + } + ) + + netWeights = workers.map(_ => net.getWeights()).reduce((a, b) => WeightCollection.add(a, b)) + netWeights.scalarDivide(1F * numWorkers) + + // save weights: + if (i % 500 == 0) { + net.setWeights(netWeights) + caffeLib.save_weights_to_file(state, "/imgnet/params/" + "%09d".format(i) + ".caffemodel") + } + + i += 50 + } + } +} diff --git a/src/main/scala/libs/Net.scala b/src/main/scala/libs/Net.scala index c284175..daa93d1 100644 --- a/src/main/scala/libs/Net.scala +++ b/src/main/scala/libs/Net.scala @@ -76,6 +76,10 @@ class CaffeNet(state: Pointer, library: CaffeLibrary) extends Net { var numTestBatches = None: Option[Int] + def getState(): Pointer = { + return state + } + def setTrainData(minibatchSampler: MinibatchSampler, trainPreprocessing: Option[(ByteImage, Array[Float]) => Unit] = None) = { imageTrainCallback = Some(makeImageCallback(minibatchSampler, trainPreprocessing)) labelTrainCallback = Some(makeLabelCallback(minibatchSampler)) @@ -207,13 +211,13 @@ class CaffeNet(state: Pointer, library: CaffeLibrary) extends Net { } object CaffeNet { - def apply(solverParameter: SolverParameter): CaffeNet = { + def apply(solverParameter: SolverParameter, numGPUs: Int = 1): CaffeNet = { val caffeLib = CaffeLibrary.INSTANCE val state = caffeLib.create_state() val byteArr = solverParameter.toByteArray() val ptr = new Memory(byteArr.length) ptr.write(0, byteArr, 0, byteArr.length) - caffeLib.load_solver_from_protobuf(state, ptr, byteArr.length) + caffeLib.load_solver_from_protobuf(state, ptr, byteArr.length, numGPUs) return new CaffeNet(state, caffeLib) } }