Skip to content

Commit

Permalink
Merge pull request #3731 from lukeyeager/lmdb-map-full
Browse files Browse the repository at this point in the history
dynamically set LMDB map size (double when full)
  • Loading branch information
shelhamer committed Apr 25, 2016
2 parents faba632 + 74040cb commit d8e2f05
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 93 deletions.
2 changes: 2 additions & 0 deletions examples/cifar10/convert_cifar_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ void convert_dataset(const string& input_folder, const string& output_folder,
}

int main(int argc, char** argv) {
FLAGS_alsologtostderr = 1;

if (argc != 4) {
printf("This script converts the CIFAR dataset to the leveldb format used\n"
"by caffe to perform classification.\n"
Expand Down
89 changes: 14 additions & 75 deletions examples/mnist/convert_mnist_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
#include <fstream> // NOLINT(readability/streams)
#include <string>

#include "boost/scoped_ptr.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/db.hpp"
#include "caffe/util/format.hpp"

#if defined(USE_LEVELDB) && defined(USE_LMDB)

using namespace caffe; // NOLINT(build/namespaces)
using boost::scoped_ptr;
using std::string;

DEFINE_string(backend, "lmdb", "The backend for storing the result");
Expand Down Expand Up @@ -67,43 +70,10 @@ void convert_dataset(const char* image_filename, const char* label_filename,
image_file.read(reinterpret_cast<char*>(&cols), 4);
cols = swap_endian(cols);

// lmdb
MDB_env *mdb_env;
MDB_dbi mdb_dbi;
MDB_val mdb_key, mdb_data;
MDB_txn *mdb_txn;
// leveldb
leveldb::DB* db;
leveldb::Options options;
options.error_if_exists = true;
options.create_if_missing = true;
options.write_buffer_size = 268435456;
leveldb::WriteBatch* batch = NULL;

// Open db
if (db_backend == "leveldb") { // leveldb
LOG(INFO) << "Opening leveldb " << db_path;
leveldb::Status status = leveldb::DB::Open(
options, db_path, &db);
CHECK(status.ok()) << "Failed to open leveldb " << db_path
<< ". Is it already existing?";
batch = new leveldb::WriteBatch();
} else if (db_backend == "lmdb") { // lmdb
LOG(INFO) << "Opening lmdb " << db_path;
CHECK_EQ(mkdir(db_path, 0744), 0)
<< "mkdir " << db_path << "failed";
CHECK_EQ(mdb_env_create(&mdb_env), MDB_SUCCESS) << "mdb_env_create failed";
CHECK_EQ(mdb_env_set_mapsize(mdb_env, 1099511627776), MDB_SUCCESS) // 1TB
<< "mdb_env_set_mapsize failed";
CHECK_EQ(mdb_env_open(mdb_env, db_path, 0, 0664), MDB_SUCCESS)
<< "mdb_env_open failed";
CHECK_EQ(mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn), MDB_SUCCESS)
<< "mdb_txn_begin failed";
CHECK_EQ(mdb_open(mdb_txn, NULL, 0, &mdb_dbi), MDB_SUCCESS)
<< "mdb_open failed. Does the lmdb already exist? ";
} else {
LOG(FATAL) << "Unknown db backend " << db_backend;
}

scoped_ptr<db::DB> db(db::GetDB(db_backend));
db->Open(db_path, db::NEW);
scoped_ptr<db::Transaction> txn(db->NewTransaction());

// Storing to db
char label;
Expand All @@ -125,59 +95,28 @@ void convert_dataset(const char* image_filename, const char* label_filename,
string key_str = caffe::format_int(item_id, 8);
datum.SerializeToString(&value);

// Put in db
if (db_backend == "leveldb") { // leveldb
batch->Put(key_str, value);
} else if (db_backend == "lmdb") { // lmdb
mdb_data.mv_size = value.size();
mdb_data.mv_data = reinterpret_cast<void*>(&value[0]);
mdb_key.mv_size = key_str.size();
mdb_key.mv_data = reinterpret_cast<void*>(&key_str[0]);
CHECK_EQ(mdb_put(mdb_txn, mdb_dbi, &mdb_key, &mdb_data, 0), MDB_SUCCESS)
<< "mdb_put failed";
} else {
LOG(FATAL) << "Unknown db backend " << db_backend;
}
txn->Put(key_str, value);

if (++count % 1000 == 0) {
// Commit txn
if (db_backend == "leveldb") { // leveldb
db->Write(leveldb::WriteOptions(), batch);
delete batch;
batch = new leveldb::WriteBatch();
} else if (db_backend == "lmdb") { // lmdb
CHECK_EQ(mdb_txn_commit(mdb_txn), MDB_SUCCESS)
<< "mdb_txn_commit failed";
CHECK_EQ(mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn), MDB_SUCCESS)
<< "mdb_txn_begin failed";
} else {
LOG(FATAL) << "Unknown db backend " << db_backend;
}
txn->Commit();
}
}
// write the last batch
if (count % 1000 != 0) {
if (db_backend == "leveldb") { // leveldb
db->Write(leveldb::WriteOptions(), batch);
delete batch;
delete db;
} else if (db_backend == "lmdb") { // lmdb
CHECK_EQ(mdb_txn_commit(mdb_txn), MDB_SUCCESS) << "mdb_txn_commit failed";
mdb_close(mdb_env, mdb_dbi);
mdb_env_close(mdb_env);
} else {
LOG(FATAL) << "Unknown db backend " << db_backend;
}
LOG(ERROR) << "Processed " << count << " files.";
txn->Commit();
}
LOG(INFO) << "Processed " << count << " files.";
delete[] pixels;
db->Close();
}

int main(int argc, char** argv) {
#ifndef GFLAGS_GFLAGS_H_
namespace gflags = google;
#endif

FLAGS_alsologtostderr = 1;

gflags::SetUsageMessage("This script converts the MNIST dataset to\n"
"the lmdb/leveldb format used by Caffe to load data.\n"
"Usage:\n"
Expand Down
13 changes: 8 additions & 5 deletions include/caffe/util/db_lmdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#define CAFFE_UTIL_DB_LMDB_HPP

#include <string>
#include <vector>

#include "lmdb.h"

Expand Down Expand Up @@ -54,14 +55,16 @@ class LMDBCursor : public Cursor {

class LMDBTransaction : public Transaction {
public:
explicit LMDBTransaction(MDB_dbi* mdb_dbi, MDB_txn* mdb_txn)
: mdb_dbi_(mdb_dbi), mdb_txn_(mdb_txn) { }
explicit LMDBTransaction(MDB_env* mdb_env)
: mdb_env_(mdb_env) { }
virtual void Put(const string& key, const string& value);
virtual void Commit() { MDB_CHECK(mdb_txn_commit(mdb_txn_)); }
virtual void Commit();

private:
MDB_dbi* mdb_dbi_;
MDB_txn* mdb_txn_;
MDB_env* mdb_env_;
vector<string> keys, values;

void DoubleMapSize();

DISABLE_COPY_AND_ASSIGN(LMDBTransaction);
};
Expand Down
65 changes: 52 additions & 13 deletions src/caffe/util/db_lmdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@

namespace caffe { namespace db {

const size_t LMDB_MAP_SIZE = 1099511627776; // 1 TB

void LMDB::Open(const string& source, Mode mode) {
MDB_CHECK(mdb_env_create(&mdb_env_));
MDB_CHECK(mdb_env_set_mapsize(mdb_env_, LMDB_MAP_SIZE));
if (mode == NEW) {
CHECK_EQ(mkdir(source.c_str(), 0744), 0) << "mkdir " << source << "failed";
}
Expand Down Expand Up @@ -48,19 +45,61 @@ LMDBCursor* LMDB::NewCursor() {
}

LMDBTransaction* LMDB::NewTransaction() {
MDB_txn* mdb_txn;
MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn));
MDB_CHECK(mdb_dbi_open(mdb_txn, NULL, 0, &mdb_dbi_));
return new LMDBTransaction(&mdb_dbi_, mdb_txn);
return new LMDBTransaction(mdb_env_);
}

void LMDBTransaction::Put(const string& key, const string& value) {
MDB_val mdb_key, mdb_value;
mdb_key.mv_data = const_cast<char*>(key.data());
mdb_key.mv_size = key.size();
mdb_value.mv_data = const_cast<char*>(value.data());
mdb_value.mv_size = value.size();
MDB_CHECK(mdb_put(mdb_txn_, *mdb_dbi_, &mdb_key, &mdb_value, 0));
keys.push_back(key);
values.push_back(value);
}

void LMDBTransaction::Commit() {
MDB_dbi mdb_dbi;
MDB_val mdb_key, mdb_data;
MDB_txn *mdb_txn;

// Initialize MDB variables
MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn));
MDB_CHECK(mdb_dbi_open(mdb_txn, NULL, 0, &mdb_dbi));

bool out_of_memory = false;
for (int i = 0; i < keys.size(); i++) {
mdb_key.mv_size = keys[i].size();
mdb_key.mv_data = const_cast<char*>(keys[i].data());
mdb_data.mv_size = values[i].size();
mdb_data.mv_data = const_cast<char*>(values[i].data());

int put_rc = mdb_put(mdb_txn, mdb_dbi, &mdb_key, &mdb_data, 0);
if (put_rc == MDB_MAP_FULL) {
out_of_memory = true;
break;
} else {
// Failed for some other reason
MDB_CHECK(put_rc);
}
}

if (!out_of_memory) {
// Commit the transaction
MDB_CHECK(mdb_txn_commit(mdb_txn));
mdb_dbi_close(mdb_env_, mdb_dbi);
keys.clear();
values.clear();
} else {
// Double the map size and retry
mdb_txn_abort(mdb_txn);
mdb_dbi_close(mdb_env_, mdb_dbi);
DoubleMapSize();
Commit();
}
}

void LMDBTransaction::DoubleMapSize() {
struct MDB_envinfo current_info;
MDB_CHECK(mdb_env_info(mdb_env_, &current_info));
size_t new_size = current_info.me_mapsize * 2;
DLOG(INFO) << "Doubling LMDB map size to " << (new_size>>20) << "MB ...";
MDB_CHECK(mdb_env_set_mapsize(mdb_env_, new_size));
}

} // namespace db
Expand Down

0 comments on commit d8e2f05

Please sign in to comment.