Skip to content

Commit

Permalink
MongoDB driver changes.
Browse files Browse the repository at this point in the history
Implemented safe, w, journal, fsync, wTimeoutMS, and the ability to
specify a database name in the mongdb:// URL.

If safe mode is specified in the URL (or is implied based on the other
options) the getLastError command will be run after the commands
insert, update, and remove.

If a database name is given on the mongdb:// URL it is prepended to calls to
MongoDB.opIndex(). So if you specify database mydb on the URL
(mongodb://localhost/mydb) you can just give the collection name instead
of the database.collection string.

Examples:

No database specified:
auto db = connectMongoDB("mongodb://localhost");
auto col = db["mydb.mycollection"];

Database name of mydb specified in the URL:
auto db = connectMongoDB("mongodb://localhost/mydb");
auto col = db["mycollection"];

If you specify a database in the URL you can still access other database by
using the new getCollection() function which ignores the database specified
in the mongodb:// URL.

auto db = connectMongoDB("mongodb://localhost/mydb");
auto col = db.getCollection("admin.system.indexes");
  • Loading branch information
davideagen committed Jul 31, 2012
1 parent 73b1ca0 commit ea2a0c6
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 56 deletions.
209 changes: 164 additions & 45 deletions source/vibe/db/mongo/connection.d
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,30 @@ import std.string;
*/
class MongoConnection : EventedObject {
private {
MongoClientSettings config;
MongoClientSettings settings;
TcpConnection m_conn;
ulong m_bytesRead;
int m_msgid = 1;
}

this(string server, ushort port = 27017)
{
config = new MongoClientSettings();
config.hosts ~= new MongoHost(server, port);
settings = new MongoClientSettings();
settings.hosts ~= new MongoHost(server, port);
}

this(MongoClientSettings cfg)
{
config = cfg;
settings = cfg;

// Now let's check for features that are not yet supported.
if(config.hosts.length > 1)
logWarn("Multiple mongodb hosts are not yet supported. Using first one: {}:{}",
config.hosts[0].name, config.hosts[0].port);
if(config.username != "")
logWarn("MongoDB username is not yet supported. Ignoring username: {}", config.username);
if(config.password != "")
if(settings.hosts.length > 1)
logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s",
settings.hosts[0].name, settings.hosts[0].port);
if(settings.username != string.init)
logWarn("MongoDB username is not yet supported. Ignoring username: %s", settings.username);
if(settings.password != string.init)
logWarn("MongoDB password is not yet supported. Ignoring password.");
if(config.database != "")
logWarn("MongoDB database is not yet supported. Ignoring database value: {}", config.database);
}

// changes the ownership of this connection
Expand All @@ -75,7 +73,7 @@ class MongoConnection : EventedObject {
* TODO: Connect to one of the specified hosts taking into consideration
* options such as connect timeouts and so on.
*/
m_conn = connectTcp(config.hosts[0].name, config.hosts[0].port);
m_conn = connectTcp(settings.hosts[0].name, settings.hosts[0].port);
m_bytesRead = 0;
}

Expand All @@ -99,6 +97,10 @@ class MongoConnection : EventedObject {
msg.addBSON(selector);
msg.addBSON(update);
send(msg);
if(settings.safe)
{
safeModeLastError(collection_name);
}
}

void insert(string collection_name, InsertFlags flags, Bson[] documents)
Expand All @@ -112,13 +114,18 @@ class MongoConnection : EventedObject {
msg.addBSON(d);
}
send(msg);

if(settings.safe)
{
safeModeLastError(collection_name);
}
}

Reply query(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector = Bson(null))
{
scope(failure) disconnect();
scope msg = new Message(OpCode.Query);
msg.addInt(flags | config.defQueryFlags);
msg.addInt(flags | settings.defQueryFlags);
msg.addCString(collection_name);
msg.addInt(nskip);
msg.addInt(nret);
Expand Down Expand Up @@ -148,6 +155,10 @@ class MongoConnection : EventedObject {
msg.addInt(flags);
msg.addBSON(selector);
send(msg);
if(settings.safe)
{
safeModeLastError(collection_name);
}
}

void killCursors(long[] cursors)
Expand Down Expand Up @@ -235,17 +246,38 @@ class MongoConnection : EventedObject {
private void recv(ubyte[] dst) { enforce(m_conn); m_conn.read(dst); m_bytesRead += dst.length; }

private int nextMessageId() { return m_msgid++; }

private Reply safeModeLastError(string collection_name)
{
Bson[string] command_and_options = ["getlasterror": Bson(1)];

if(settings.w != settings.w.init)
command_and_options["w"] = settings.w; // Already a Bson struct
if(settings.wTimeoutMS != settings.wTimeoutMS.init)
command_and_options["wtimeout"] = Bson(settings.wTimeoutMS);
if(settings.journal)
command_and_options["j"] = Bson(true);
if(settings.fsync)
command_and_options["fsync"] = Bson(true);

logTrace("Running safeModeLastError on %s", collection_name);

Reply results = query(collection_name, QueryFlags.None | settings.defQueryFlags,
0, 0, serializeToBson(command_and_options));
return results;
}
}

/**
* Parses the given string as a mongodb URL. Url must be in the form documented at
* http://www.mongodb.org/display/DOCS/Connections which is:
* $(LINK http://www.mongodb.org/display/DOCS/Connections) which is:
*
* mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
*
* Returns true if the URL was successfully parsed. Returns false if the URL can not be parsed.
* If the URL is successfully parsed the MongoConfig struct will contain the parsed data.
* If the URL is not successfully parsed the information in the MongoConfig struct may be
* Returns: true if the URL was successfully parsed. False if the URL can not be parsed.
*
* If the URL is successfully parsed the MongoClientSettings instance will contain the parsed config.
* If the URL is not successfully parsed the information in the MongoClientSettings instance may be
* incomplete and should not be used.
*/
bool parseMongoDBUrl(out MongoClientSettings cfg, string url)
Expand Down Expand Up @@ -351,29 +383,107 @@ bool parseMongoDBUrl(out MongoClientSettings cfg, string url)
auto setting = to!bool(value);
if(setting) cfg.defQueryFlags |= QueryFlags.SlaveOk;
} catch (Exception e) {
logError("Value for slaveOk must be true or false but was {}", value);
logError("Value for slaveOk must be true or false but was %s", value);
}
break;

// These options aren't implemented yet so we'll warn on them.
case "replicaset":
cfg.replicaSet = value;

logWarn("MongoDB option %s not yet implemented.", option);
break;

case "safe":
try
{
cfg.safe = to!bool(value);
} catch (Exception e) {
logError("Value for safe must be true or false but was %s", value);
}
break;

case "w":
try
{
if(icmp(value, "majority") == 0)
{
cfg.w = Bson("majority");
} else {
cfg.w = Bson(to!long(value));
}
} catch (Exception e) {
logError("Invalid w value: [%s] Should be an integer number or 'majority'", value);
}

break;

case "wtimeoutms":
try
{
cfg.wTimeoutMS = to!long(value);
} catch (Exception e) {
logError("Invalid wTimeoutMS value: [%s] Should be an integer number", value);
}

break;

case "fsync":
try
{
cfg.fsync = to!bool(value);
} catch (Exception e) {
logError("Value for fsync must be true or false but was %s", value);
}

break;

case "journal":
case "connecttimeoutms":
try
{
cfg.journal = to!bool(value);
} catch (Exception e) {
logError("Value for journal must be true or false but was %s", value);
}

break;

case "connecttimeoutms":
try
{
cfg.connectTimeoutMS = to!long(value);
} catch (Exception e) {
logError("Invalid connectTimeoutMS value: [%s] Should be an integer number", value);
}

logWarn("MongoDB option %s not yet implemented.", option);
break;

case "sockettimeoutms":
logWarn("MongoDB option {} not yet implemented.", option);
try
{
cfg.socketTimeoutMS = to!long(value);
} catch (Exception e) {
logError("Invalid socketTimeoutMS value: [%s] Should be an integer number", value);
}

logWarn("MongoDB option %s not yet implemented.", option);
break;

// Catch-all
default:
logWarn("Unknown MongoDB option {}", option);
logWarn("Unknown MongoDB option %s", option);
}

// Store the options in string format in case we want them later.
cfg.options[option] = value;
}

/* Some settings imply safe. If they are set, set safe to true regardless
* of what it was set to in the URL string
*/
if( (cfg.w != Bson.init) || (cfg.wTimeoutMS != long.init) ||
cfg.journal || cfg.fsync )
{
cfg.safe = true;
}
}

return true;
Expand All @@ -387,17 +497,23 @@ unittest
assert(parseMongoDBUrl(cfg, "mongodb://localhost"));
assert(cfg.hosts.length == 1);
assert(cfg.database == "");
assert(cfg.options.length == 0);
assert(cfg.hosts[0].name == "localhost");
assert(cfg.hosts[0].port == 27017);
assert(cfg.defQueryFlags == QueryFlags.None);
assert(cfg.replicaSet == "");
assert(cfg.safe == false);
assert(cfg.w == Bson.init);
assert(cfg.wTimeoutMS == long.init);
assert(cfg.fsync == false);
assert(cfg.journal == false);
assert(cfg.connectTimeoutMS == long.init);
assert(cfg.socketTimeoutMS == long.init);

cfg = MongoClientSettings.init;
assert(parseMongoDBUrl(cfg, "mongodb://fred:foobar@localhost"));
assert(cfg.username == "fred");
assert(cfg.password == "foobar");
assert(cfg.hosts.length == 1);
assert(cfg.database == "");
assert(cfg.options.length == 0);
assert(cfg.hosts[0].name == "localhost");
assert(cfg.hosts[0].port == 27017);

Expand All @@ -409,8 +525,6 @@ unittest
assert(cfg.hosts.length == 1);
assert(cfg.hosts[0].name == "localhost");
assert(cfg.hosts[0].port == 27017);
assert(cfg.options.length == 0);
assert(cfg.defQueryFlags == QueryFlags.None);

cfg = MongoClientSettings.init;
assert(parseMongoDBUrl(cfg, "mongodb://host1,host2,host3/?safe=true&w=2&wtimeoutMS=2000&slaveOk=false"));
Expand All @@ -424,17 +538,15 @@ unittest
assert(cfg.hosts[1].port == 27017);
assert(cfg.hosts[2].name == "host3");
assert(cfg.hosts[2].port == 27017);
assert(cfg.options.length == 3);
assert(cfg.options["safe"] == "true");
assert(cfg.options["w"] == "2");
assert(cfg.options["wtimeoutms"] == "2000");
assert(cfg.options["slaveok"] == "false");
assert(cfg.defQueryFlags == QueryFlags.None);
assert(cfg.safe == true);
assert(cfg.w == Bson(2));
assert(cfg.wTimeoutMS == 2000);
assert(cfg.defQueryFlags == QueryFlags.SlaveOk);

cfg = MongoClientSettings.init;
assert(parseMongoDBUrl(cfg,
"mongodb://fred:[email protected],host2.other.example.com:27108,host3:"
"27019/mydb?safe=true;w=2;wtimeoutMS=2000;slaveok=true"));
"27019/mydb?journal=true;fsync=true;connectMS=1500;sockettimeoutMs=1000;w=majority"));
assert(cfg.username == "fred");
assert(cfg.password == "flinstone");
assert(cfg.database == "mydb");
Expand All @@ -444,13 +556,13 @@ unittest
assert(cfg.hosts[1].name == "host2.other.example.com");
assert(cfg.hosts[1].port == 27108);
assert(cfg.hosts[2].name == "host3");
assert(cfg.hosts[2].port == 27019);
assert(cfg.options.length == 3);
assert(cfg.options["safe"] == "true");
assert(cfg.options["w"] == "2");
assert(cfg.options["wtimeoutms"] == "2000");
assert(cfg.options["slaveok"] == "true");
assert(cfg.defQueryFlags & QueryFlags.SlaveOk);
assert(cfg.hosts[2].port == 27019);assert(cfg.fsync == false);
assert(cfg.fsync == true);
assert(cfg.journal == true);
assert(cfg.connectTimeoutMS == 1500);
assert(cfg.socketTimeoutMS == 1000);
assert(cfg.w == Bson("majority"));
assert(cfg.safe == true);

// Invalid URLs - these should fail to parse
cfg = MongoClientSettings.init;
Expand Down Expand Up @@ -540,8 +652,15 @@ class MongoClientSettings
string password;
MongoHost[] hosts;
string database;
string[string] options;
QueryFlags defQueryFlags = QueryFlags.None;
string replicaSet;
bool safe;
Bson w; // Either a number or the string 'majority'
long wTimeoutMS;
bool fsync;
bool journal;
long connectTimeoutMS;
long socketTimeoutMS;
}

class MongoHost
Expand Down
Loading

0 comments on commit ea2a0c6

Please sign in to comment.