Skip to content

Commit

Permalink
multi GPU support on one node
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Dec 8, 2015
1 parent 202b23f commit b73ed97
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 65 deletions.
3 changes: 2 additions & 1 deletion caffe/include/caffe/parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
return solver_;
}

void run(const vector<int>& gpus);
vector<shared_ptr<P2PSync<Dtype> > > initialize(const vector<int>& gpus);
void step(vector<shared_ptr<P2PSync<Dtype> > > , int num_steps);

protected:
void on_start();
Expand Down
2 changes: 0 additions & 2 deletions caffe/models/bvlc_googlenet/quick_solver.prototxt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
net: "models/bvlc_googlenet/train_val.prototxt"
test_iter: 1000
test_interval: 4000
test_initialization: false
display: 40
average_loss: 40
Expand Down
26 changes: 1 addition & 25 deletions caffe/models/bvlc_googlenet/train_val.prototxt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ layer {
type: "Data"
top: "data"
top: "label"
include {
phase: TRAIN
}
transform_param {
mirror: true
crop_size: 224
Expand All @@ -15,32 +12,11 @@ layer {
mean_value: 123
}
data_param {
source: "examples/imagenet/ilsvrc12_train_lmdb"
source: "/imgnet/traindb"
batch_size: 32
backend: LMDB
}
}
layer {
name: "data"
type: "Data"
top: "data"
top: "label"
include {
phase: TEST
}
transform_param {
mirror: false
crop_size: 224
mean_value: 104
mean_value: 117
mean_value: 123
}
data_param {
source: "examples/imagenet/ilsvrc12_val_lmdb"
batch_size: 50
backend: LMDB
}
}
layer {
name: "conv1/7x7_s2"
type: "Convolution"
Expand Down
11 changes: 7 additions & 4 deletions caffe/src/caffe/parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void P2PSync<Dtype>::on_gradients_ready() {
}

template<typename Dtype>
void P2PSync<Dtype>::run(const vector<int>& gpus) {
vector<shared_ptr<P2PSync<Dtype> > > P2PSync<Dtype>::initialize(const vector<int>& gpus) {
// Pair devices for map-reduce synchronization
vector<DevicePair> pairs;
DevicePair::compute(gpus, &pairs);
Expand Down Expand Up @@ -418,13 +418,16 @@ void P2PSync<Dtype>::run(const vector<int>& gpus) {

LOG(INFO)<< "Starting Optimization";

return syncs;
}

template<typename Dtype>
void P2PSync<Dtype>::step(vector<shared_ptr<P2PSync<Dtype> > > syncs, int num_steps) {
for (int i = 1; i < syncs.size(); ++i) {
syncs[i]->StartInternalThread();
}

// Run root solver on current thread
solver_->Solve();

solver_->Step(num_steps);
for (int i = 1; i < syncs.size(); ++i) {
syncs[i]->StopInternalThread();
}
Expand Down
2 changes: 1 addition & 1 deletion caffe/src/caffe/util/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/text_format.h>
#include <opencv2/core/core.hpp>
#ifdef USE_OPENCV
#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>
#include <opencv2/highgui/highgui_c.h>
#include <opencv2/imgproc/imgproc.hpp>
Expand Down
24 changes: 24 additions & 0 deletions cmake/Modules/FindLMDB.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Try to find the LMBD libraries and headers
# LMDB_FOUND - system has LMDB lib
# LMDB_INCLUDE_DIR - the LMDB include directory
# LMDB_LIBRARIES - Libraries needed to use LMDB

# FindCWD based on FindGMP by:
# Copyright (c) 2006, Laurent Montel, <[email protected]>
#
# Redistribution and use is allowed according to the terms of the BSD license.

# Adapted from FindCWD by:
# Copyright 2013 Conrad Steenberg <[email protected]>
# Aug 31, 2013

find_path(LMDB_INCLUDE_DIR NAMES lmdb.h PATHS "$ENV{LMDB_DIR}/include")
find_library(LMDB_LIBRARIES NAMES lmdb PATHS "$ENV{LMDB_DIR}/lib" )

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(LMDB DEFAULT_MSG LMDB_INCLUDE_DIR LMDB_LIBRARIES)

if(LMDB_FOUND)
message(STATUS "Found lmdb (include: ${LMDB_INCLUDE_DIR}, library: ${LMDB_LIBRARIES})")
mark_as_advanced(LMDB_INCLUDE_DIR LMDB_LIBRARIES)
endif()
8 changes: 4 additions & 4 deletions ec2/create_labelfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

trainfile = open(args.trainfile, 'r')
for line in trainfile.readlines():
(fname, label) = line.split()
labelmap[fname.upper()] = label # poor man's file name normalization
(fname, label) = line.split()
labelmap[fname.upper()] = label # poor man's file name normalization
trainfile.close()

outfile = open(args.outfile, 'w')
for root, dirs, files in os.walk(args.directory):
for f in files:
outfile.write(f + " " + labelmap[f.upper()] + "\n")
for f in files:
outfile.write(f + " " + labelmap[f.upper()] + "\n")
outfile.close()
30 changes: 15 additions & 15 deletions ec2/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,30 @@
args = parser.parse_args()

def download_files(directory, tar_path):
response = s3.get_object(Bucket='sparknet', Key=tar_path)
response = s3.get_object(Bucket='sparknet', Key=tar_path)

output = io.BytesIO()
output = io.BytesIO()

chunk = response['Body'].read(1024 * 8)
while chunk:
output.write(chunk)
chunk = response['Body'].read(1024 * 8)
chunk = response['Body'].read(1024 * 8)
while chunk:
output.write(chunk)
chunk = response['Body'].read(1024 * 8)

output.seek(0) # go to the beginning of the .tar file
output.seek(0) # go to the beginning of the .tar file

tar = tarfile.open(mode= "r", fileobj=output)
tar = tarfile.open(mode= "r", fileobj=output)

for member in tar.getmembers():
filename = member.path # in a format like 'n02099601_3085.JPEG'
content = tar.extractfile(member)
out = open(os.path.join(directory, filename), 'w')
out.write(content.read())
out.close()
for member in tar.getmembers():
filename = member.path # in a format like 'n02099601_3085.JPEG'
content = tar.extractfile(member)
out = open(os.path.join(directory, filename), 'w')
out.write(content.read())
out.close()


directory = os.path.join(args.directory, '%03d-%03d' % (args.start_idx, args.stop_idx))
if not os.path.exists(directory):
os.makedirs(directory)

for idx in range(args.start_idx, args.stop_idx):
download_files(directory, 'ILSVRC2012_train/files-shuf-%03d.tar' % idx)
download_files(directory, 'ILSVRC2012_train/files-shuf-%03d.tar' % idx)
6 changes: 6 additions & 0 deletions libccaffe/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ include_directories(${PROJECT_BINARY_DIR}/../caffe/include/)
# for gtest
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../caffe/src/)

# for lmdb
add_definitions(-DUSE_LMDB)
find_package(LMDB REQUIRED)
include_directories(SYSTEM ${LMDB_INCLUDE_DIRS})
list(APPEND Caffe_LINKER_LIBS ${LMDB_LIBRARIES})

## The files

file(GLOB Source GLOB "${CMAKE_CURRENT_SOURCE_DIR}/../caffe/src/caffe/*.cpp")
Expand Down
60 changes: 51 additions & 9 deletions libccaffe/ccaffe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>
#include <boost/shared_ptr.hpp>
#include <unistd.h>
#include "caffe/parallel.hpp"

#include <glog/logging.h>

Expand All @@ -17,13 +18,21 @@
using boost::shared_ptr;
using google::protobuf::Message;

// a caffenet_state supports working with either (a) a single network
// (state->net), (b) a testing and a training network coupled by a solver
// (state->solver, state->net, state->testnet) or (c) a solver that supports
// mutiple GPUs on one machine (state->sync, state->solver, state->net,
// state->testnet)

struct caffenet_state {
caffe::Net<DTYPE> *net; // holds a net, can be used independently of solver and testnet
caffe::Solver<DTYPE> *solver; // holds a solver
shared_ptr<caffe::Solver<DTYPE> > solver; // holds a solver
caffe::P2PSync<DTYPE>* sync; // GPU manager needed for multi GPUs
caffe::Net<DTYPE> *testnet; // reference to the the first (and only) testnet of the solver
std::vector<DTYPE> *test_score; // scratch space to store test scores
Message* proto; // scratch space to store protobuf message
std::vector<char>* buffer; // scratch space to pass serialized protobuf to client
std::vector<shared_ptr<caffe::P2PSync<DTYPE> > > syncs; // vector of GPU managers, needed for multi GPU
};

void init_logging(const char* log_filename, int log_verbosity) {
Expand All @@ -48,17 +57,16 @@ caffenet_state* create_state() {
caffenet_state *state = new caffenet_state();
state->net = NULL;
state->testnet = NULL;
state->solver = NULL;
state->solver = shared_ptr<caffe::Solver<DTYPE> >();
state->sync = NULL;
state->test_score = new std::vector<DTYPE>();
state->proto = NULL;
state->buffer = new std::vector<char>();
return state;
}

void destroy_state(caffenet_state* state) {
if (state->solver != NULL) {
delete state->solver;
} else if (state->net != NULL) {
if (state->net != NULL && !state->solver) {
delete state->net;
}
if (state->proto != NULL) {
Expand All @@ -69,12 +77,34 @@ void destroy_state(caffenet_state* state) {
delete state;
}

void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len) {
void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len, int num_gpus) {
caffe::SolverParameter param;
param.ParseFromString(std::string(solver_param, solver_param_len));
state->solver = new caffe::SGDSolver<DTYPE>(param);

std::vector<int> gpus;
for (int i = 0; i < num_gpus; i++) {
gpus.push_back(i);
}

if (num_gpus > 1) {
param.set_device_id(gpus[0]);
caffe::Caffe::SetDevice(gpus[0]);
caffe::Caffe::set_mode(caffe::Caffe::GPU);
caffe::Caffe::set_solver_count(gpus.size());
}

state->solver = shared_ptr<caffe::Solver<DTYPE> >(caffe::SolverRegistry<DTYPE>::CreateSolver(param));

state->net = state->solver->net().get();
state->testnet = state->solver->test_nets()[0].get();

if(param.test_iter_size() > 0) {
state->testnet = state->solver->test_nets()[0].get();
}

if (num_gpus > 1) {
state->sync = new caffe::P2PSync<DTYPE>(state->solver, NULL, state->solver->param());
state->syncs = state->sync->initialize(gpus);
}
}

void load_net_from_protobuf(caffenet_state* state, const char* net_param, int net_param_len) {
Expand Down Expand Up @@ -172,10 +202,16 @@ void backward(caffenet_state* state) {
}

int solver_step(caffenet_state* state, int step) {
state->solver->Step(step);
CHECK(caffe::Caffe::root_solver());
if (state->sync == NULL) {
state->solver->Step(step);
} else {
state->sync->step(state->syncs, step);
}
return 0; // success
}

// TODO: Make sure that there is an error message if test is called without testnet
void solver_test(caffenet_state* state, int num_steps) {
state->test_score->clear();
state->solver->TestAndStoreResult(0, num_steps, state->test_score);
Expand Down Expand Up @@ -206,6 +242,12 @@ void load_weights_from_file(caffenet_state* state, const char* filename) {
state->net->CopyTrainedLayersFrom(filename);
}

void save_weights_to_file(caffenet_state* state, const char* filename) {
caffe::NetParameter net_param;
state->net->ToProto(&net_param, state->solver->param().snapshot_diff());
WriteProtoToBinaryFile(net_param, filename);
}

void restore_solver_from_file(caffenet_state* state, const char* filename) {
state->solver->Restore(filename);
}
Expand Down
3 changes: 2 additions & 1 deletion libccaffe/ccaffe.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern "C" {
caffenet_state* create_state();
void destroy_state(caffenet_state* state);

void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len);
void load_solver_from_protobuf(caffenet_state* state, const char* solver_param, int solver_param_len, int num_gpus);
void load_net_from_protobuf(caffenet_state* state, const char* net_param, int net_param_len);

// TODO: Write documentation for these methods (in particular the layer index)
Expand Down Expand Up @@ -59,6 +59,7 @@ extern "C" {
void set_device(int gpu_id);

void load_weights_from_file(caffenet_state* state, const char* filename);
void save_weights_to_file(caffenet_state* state, const char* filename);
void restore_solver_from_file(caffenet_state* state, const char* filename);

bool parse_net_prototxt(caffenet_state* state, const char* filename);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/libs/CaffeLibrary.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface java_callback_t extends Callback {
void destroy_state(Pointer state);

void load_net_from_protobuf(Pointer state, Pointer net_param, int net_param_len);
void load_solver_from_protobuf(Pointer state, Pointer solver_param, int solver_param_len);
void load_solver_from_protobuf(Pointer state, Pointer solver_param, int solver_param_len, int num_gpus);

int set_train_data_callback(Pointer state, int layer_idx, java_callback_t callback);
int set_test_data_callback(Pointer state, int layer_idx, java_callback_t callback);
Expand Down Expand Up @@ -57,6 +57,7 @@ interface java_callback_t extends Callback {
void set_device(int gpu_id);

void load_weights_from_file(Pointer state, String filename);
void save_weights_to_file(Pointer state, String filename);
void restore_solver_from_file(Pointer state, String filename);

boolean parse_net_prototxt(Pointer state, String filename);
Expand Down
Loading

0 comments on commit b73ed97

Please sign in to comment.