diff --git a/source/vibe/db/mongo/connection.d b/source/vibe/db/mongo/connection.d index 6ddab9714f..60d64785e8 100644 --- a/source/vibe/db/mongo/connection.d +++ b/source/vibe/db/mongo/connection.d @@ -26,7 +26,7 @@ import std.string; */ class MongoConnection : EventedObject { private { - MongoConnectionConfig config; + MongoClientSettings settings; TcpConnection m_conn; ulong m_bytesRead; int m_msgid = 1; @@ -34,23 +34,22 @@ class MongoConnection : EventedObject { this(string server, ushort port = 27017) { - config.hosts ~= MongoHost(server, port); + settings = new MongoClientSettings(); + settings.hosts ~= new MongoHost(server, port); } - this(MongoConnectionConfig cfg) + this(MongoClientSettings cfg) { - config = cfg; + settings = cfg; // Now let's check for features that are not yet supported. - if(config.hosts.length > 1) - logWarn("Multiple mongodb hosts are not yet supported. Using first one: {}:{}", - config.hosts[0].name, config.hosts[0].port); - if(config.username != "") - logWarn("MongoDB username is not yet supported. Ignoring username: {}", config.username); - if(config.password != "") + if(settings.hosts.length > 1) + logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s", + settings.hosts[0].name, settings.hosts[0].port); + if(settings.username != string.init) + logWarn("MongoDB username is not yet supported. Ignoring username: %s", settings.username); + if(settings.password != string.init) logWarn("MongoDB password is not yet supported. Ignoring password."); - if(config.database != "") - logWarn("MongoDB database is not yet supported. Ignoring database value: {}", config.database); } // changes the ownership of this connection @@ -74,7 +73,7 @@ class MongoConnection : EventedObject { * TODO: Connect to one of the specified hosts taking into consideration * options such as connect timeouts and so on. */ - m_conn = connectTcp(config.hosts[0].name, config.hosts[0].port); + m_conn = connectTcp(settings.hosts[0].name, settings.hosts[0].port); m_bytesRead = 0; } @@ -98,6 +97,10 @@ class MongoConnection : EventedObject { msg.addBSON(selector); msg.addBSON(update); send(msg); + if(settings.safe) + { + safeModeLastError(collection_name); + } } void insert(string collection_name, InsertFlags flags, Bson[] documents) @@ -111,13 +114,18 @@ class MongoConnection : EventedObject { msg.addBSON(d); } send(msg); + + if(settings.safe) + { + safeModeLastError(collection_name); + } } Reply query(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector = Bson(null)) { scope(failure) disconnect(); scope msg = new Message(OpCode.Query); - msg.addInt(flags | config.defQueryFlags); + msg.addInt(flags | settings.defQueryFlags); msg.addCString(collection_name); msg.addInt(nskip); msg.addInt(nret); @@ -147,6 +155,10 @@ class MongoConnection : EventedObject { msg.addInt(flags); msg.addBSON(selector); send(msg); + if(settings.safe) + { + safeModeLastError(collection_name); + } } void killCursors(long[] cursors) @@ -234,21 +246,44 @@ class MongoConnection : EventedObject { private void recv(ubyte[] dst) { enforce(m_conn); m_conn.read(dst); m_bytesRead += dst.length; } private int nextMessageId() { return m_msgid++; } + + private Reply safeModeLastError(string collection_name) + { + Bson[string] command_and_options = ["getlasterror": Bson(1)]; + + if(settings.w != settings.w.init) + command_and_options["w"] = settings.w; // Already a Bson struct + if(settings.wTimeoutMS != settings.wTimeoutMS.init) + command_and_options["wtimeout"] = Bson(settings.wTimeoutMS); + if(settings.journal) + command_and_options["j"] = Bson(true); + if(settings.fsync) + command_and_options["fsync"] = Bson(true); + + logTrace("Running safeModeLastError on %s", collection_name); + + Reply results = query(collection_name, QueryFlags.None | settings.defQueryFlags, + 0, 0, serializeToBson(command_and_options)); + return results; + } } /** * Parses the given string as a mongodb URL. Url must be in the form documented at - * http://www.mongodb.org/display/DOCS/Connections which is: + * $(LINK http://www.mongodb.org/display/DOCS/Connections) which is: * * mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]] * - * Returns true if the URL was successfully parsed. Returns false if the URL can not be parsed. - * If the URL is successfully parsed the MongoConfig struct will contain the parsed data. - * If the URL is not successfully parsed the information in the MongoConfig struct may be + * Returns: true if the URL was successfully parsed. False if the URL can not be parsed. + * + * If the URL is successfully parsed the MongoClientSettings instance will contain the parsed config. + * If the URL is not successfully parsed the information in the MongoClientSettings instance may be * incomplete and should not be used. */ -bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) +bool parseMongoDBUrl(out MongoClientSettings cfg, string url) { + cfg = new MongoClientSettings(); + string tmpUrl = url[0..$]; // Slice of the url (not a copy) if(!startsWith(tmpUrl, "mongodb://")) @@ -301,7 +336,7 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) port = to!ushort(hostPort.front); } - cfg.hosts ~= MongoHost(host, port); + cfg.hosts ~= new MongoHost(host, port); } } catch ( Exception e) { return false; // Probably failed converting the port to ushort. @@ -348,29 +383,107 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) auto setting = to!bool(value); if(setting) cfg.defQueryFlags |= QueryFlags.SlaveOk; } catch (Exception e) { - logError("Value for slaveOk must be true or false but was {}", value); + logError("Value for slaveOk must be true or false but was %s", value); } break; + // These options aren't implemented yet so we'll warn on them. case "replicaset": + cfg.replicaSet = value; + + logWarn("MongoDB option %s not yet implemented.", option); + break; + case "safe": + try + { + cfg.safe = to!bool(value); + } catch (Exception e) { + logError("Value for safe must be true or false but was %s", value); + } + break; + case "w": + try + { + if(icmp(value, "majority") == 0) + { + cfg.w = Bson("majority"); + } else { + cfg.w = Bson(to!long(value)); + } + } catch (Exception e) { + logError("Invalid w value: [%s] Should be an integer number or 'majority'", value); + } + + break; + case "wtimeoutms": + try + { + cfg.wTimeoutMS = to!long(value); + } catch (Exception e) { + logError("Invalid wTimeoutMS value: [%s] Should be an integer number", value); + } + + break; + case "fsync": + try + { + cfg.fsync = to!bool(value); + } catch (Exception e) { + logError("Value for fsync must be true or false but was %s", value); + } + + break; + case "journal": - case "connecttimeoutms": + try + { + cfg.journal = to!bool(value); + } catch (Exception e) { + logError("Value for journal must be true or false but was %s", value); + } + + break; + + case "connecttimeoutms": + try + { + cfg.connectTimeoutMS = to!long(value); + } catch (Exception e) { + logError("Invalid connectTimeoutMS value: [%s] Should be an integer number", value); + } + + logWarn("MongoDB option %s not yet implemented.", option); + break; + case "sockettimeoutms": - logWarn("MongoDB option {} not yet implemented.", option); + try + { + cfg.socketTimeoutMS = to!long(value); + } catch (Exception e) { + logError("Invalid socketTimeoutMS value: [%s] Should be an integer number", value); + } + + logWarn("MongoDB option %s not yet implemented.", option); break; // Catch-all default: - logWarn("Unknown MongoDB option {}", option); + logWarn("Unknown MongoDB option %s", option); } - - // Store the options in string format in case we want them later. - cfg.options[option] = value; } + + /* Some settings imply safe. If they are set, set safe to true regardless + * of what it was set to in the URL string + */ + if( (cfg.w != Bson.init) || (cfg.wTimeoutMS != long.init) || + cfg.journal || cfg.fsync ) + { + cfg.safe = true; + } } return true; @@ -379,26 +492,32 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) /* Test for parseMongoDBUrl */ unittest { - MongoConnectionConfig cfg; + MongoClientSettings cfg; assert(parseMongoDBUrl(cfg, "mongodb://localhost")); assert(cfg.hosts.length == 1); assert(cfg.database == ""); - assert(cfg.options.length == 0); assert(cfg.hosts[0].name == "localhost"); assert(cfg.hosts[0].port == 27017); + assert(cfg.defQueryFlags == QueryFlags.None); + assert(cfg.replicaSet == ""); + assert(cfg.safe == false); + assert(cfg.w == Bson.init); + assert(cfg.wTimeoutMS == long.init); + assert(cfg.fsync == false); + assert(cfg.journal == false); + assert(cfg.connectTimeoutMS == long.init); + assert(cfg.socketTimeoutMS == long.init); - cfg = MongoConnectionConfig.init; + cfg = MongoClientSettings.init; assert(parseMongoDBUrl(cfg, "mongodb://fred:foobar@localhost")); assert(cfg.username == "fred"); - assert(cfg.password == "foobar"); assert(cfg.hosts.length == 1); assert(cfg.database == ""); - assert(cfg.options.length == 0); assert(cfg.hosts[0].name == "localhost"); assert(cfg.hosts[0].port == 27017); - cfg = MongoConnectionConfig.init; + cfg = MongoClientSettings.init; assert(parseMongoDBUrl(cfg, "mongodb://fred:@localhost/baz")); assert(cfg.username == "fred"); assert(cfg.password == ""); @@ -406,10 +525,8 @@ unittest assert(cfg.hosts.length == 1); assert(cfg.hosts[0].name == "localhost"); assert(cfg.hosts[0].port == 27017); - assert(cfg.options.length == 0); - assert(cfg.defQueryFlags == QueryFlags.None); - cfg = MongoConnectionConfig.init; + cfg = MongoClientSettings.init; assert(parseMongoDBUrl(cfg, "mongodb://host1,host2,host3/?safe=true&w=2&wtimeoutMS=2000&slaveOk=false")); assert(cfg.username == ""); assert(cfg.password == ""); @@ -421,17 +538,15 @@ unittest assert(cfg.hosts[1].port == 27017); assert(cfg.hosts[2].name == "host3"); assert(cfg.hosts[2].port == 27017); - assert(cfg.options.length == 3); - assert(cfg.options["safe"] == "true"); - assert(cfg.options["w"] == "2"); - assert(cfg.options["wtimeoutms"] == "2000"); - assert(cfg.options["slaveok"] == "false"); - assert(cfg.defQueryFlags == QueryFlags.None); + assert(cfg.safe == true); + assert(cfg.w == Bson(2)); + assert(cfg.wTimeoutMS == 2000); + assert(cfg.defQueryFlags == QueryFlags.SlaveOk); - cfg = MongoConnectionConfig.init; + cfg = MongoClientSettings.init; assert(parseMongoDBUrl(cfg, "mongodb://fred:flinstone@host1.example.com,host2.other.example.com:27108,host3:" - "27019/mydb?safe=true;w=2;wtimeoutMS=2000;slaveok=true")); + "27019/mydb?journal=true;fsync=true;connectMS=1500;sockettimeoutMs=1000;w=majority")); assert(cfg.username == "fred"); assert(cfg.password == "flinstone"); assert(cfg.database == "mydb"); @@ -441,16 +556,16 @@ unittest assert(cfg.hosts[1].name == "host2.other.example.com"); assert(cfg.hosts[1].port == 27108); assert(cfg.hosts[2].name == "host3"); - assert(cfg.hosts[2].port == 27019); - assert(cfg.options.length == 3); - assert(cfg.options["safe"] == "true"); - assert(cfg.options["w"] == "2"); - assert(cfg.options["wtimeoutms"] == "2000"); - assert(cfg.options["slaveok"] == "true"); - assert(cfg.defQueryFlags & QueryFlags.SlaveOk); + assert(cfg.hosts[2].port == 27019);assert(cfg.fsync == false); + assert(cfg.fsync == true); + assert(cfg.journal == true); + assert(cfg.connectTimeoutMS == 1500); + assert(cfg.socketTimeoutMS == 1000); + assert(cfg.w == Bson("majority")); + assert(cfg.safe == true); // Invalid URLs - these should fail to parse - cfg = MongoConnectionConfig.init; + cfg = MongoClientSettings.init; assert(! (parseMongoDBUrl(cfg, "localhost:27018"))); assert(! (parseMongoDBUrl(cfg, "http://blah"))); assert(! (parseMongoDBUrl(cfg, "mongodb://@localhost"))); @@ -531,17 +646,24 @@ private class Message { void addBSON(Bson v) { m_data.put(v.data); } } -struct MongoConnectionConfig +class MongoClientSettings { string username; string password; MongoHost[] hosts; string database; - string[string] options; QueryFlags defQueryFlags = QueryFlags.None; + string replicaSet; + bool safe; + Bson w; // Either a number or the string 'majority' + long wTimeoutMS; + bool fsync; + bool journal; + long connectTimeoutMS; + long socketTimeoutMS; } -struct MongoHost +class MongoHost { string name; ushort port; diff --git a/source/vibe/db/mongo/db.d b/source/vibe/db/mongo/db.d index d8efc2f75d..890ee74376 100644 --- a/source/vibe/db/mongo/db.d +++ b/source/vibe/db/mongo/db.d @@ -16,13 +16,14 @@ import vibe.db.mongo.connection; import core.thread; import std.conv; +import std.string; /** Represents a single remote MongoDB. */ class MongoDB { private { - MongoConnectionConfig config; + MongoClientSettings settings; ConnectionPool!MongoConnection m_connections; } @@ -34,22 +35,22 @@ class MongoDB { } /** - * Throws an exception if the URL cannot be parsed as a valid MongoDB URL. + * Throws: an exception if the URL cannot be parsed as a valid MongoDB URL. * * Url must be in the form documented at - * http://www.mongodb.org/display/DOCS/Connections which is: + * $(LINK http://www.mongodb.org/display/DOCS/Connections) which is: * * mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]] * */ package this(string url) { - auto goodUrl = parseMongoDBUrl(config, url); + auto goodUrl = parseMongoDBUrl(settings, url); if(!goodUrl) throw new Exception("Unable to parse mongodb URL: " ~ url); m_connections = new ConnectionPool!MongoConnection({ - auto ret = new MongoConnection(config); + auto ret = new MongoConnection(settings); ret.connect(); return ret; }); @@ -58,28 +59,76 @@ class MongoDB { /** Runs a command on the specified database. - See_Also: http://www.mongodb.org/display/DOCS/Commands + See_Also: $(LINK http://www.mongodb.org/display/DOCS/Commands) */ Bson runCommand(string db, Bson[string] command_and_options) { - return this[db~".$cmd"].findOne(command_and_options); + return getCollection(db~".$cmd").findOne(command_and_options); } - /// See http://www.mongodb.org/display/DOCS/getLog+Command + /// See $(LINK http://www.mongodb.org/display/DOCS/getLog+Command) Bson getLog(string db, string mask){ return runCommand(db, ["getLog" : Bson(mask)]); } - /// http://www.mongodb.org/display/DOCS/fsync+Command + /// See $(LINK http://www.mongodb.org/display/DOCS/fsync+Command) Bson fsync(string db, bool async = false){ return runCommand(db, ["fsync" : Bson(1), "async" : Bson(async)]); } - /// http://www.mongodb.org/display/DOCS/getLastError+Command + /// See $(LINK http://www.mongodb.org/display/DOCS/getLastError+Command) Bson getLastError(string db){ return runCommand(db, ["getlasterror" : Bson(1)]); } /** Accesses the collections inside this DB. + + Examples: + --- + auto db = connectMongoDB("mongodb://localhost/mydatabase"); + auto col = db["mycollection"]; + + auto db = connectMongoDB("mongodb://localhost"); + auto col = db["mydatabase.mycollection"]; + --- Throws: Exception if a DB communication error occured. */ - MongoCollection opIndex(string name) { return MongoCollection(this, name); } + MongoCollection opIndex(string name) + { + string realname; + + // If a database has been set in the MongoClientSettings prepend that name. + if(settings.database != string.init) + { + if(name.startsWith(".")) + realname = settings.database ~ name; + else + realname = settings.database ~ "." ~ name; + } else { + realname = name; + } + + logTrace("Returning collection for '%s' in response to request for '%s'", realname, name); + + return MongoCollection(this, realname); + } + + /** + * Return: MongoCollection for the given database and collecting specified. + * + * If a default database has been set in the MongoClientSettings it is NOT used here. + * The full database.collection path must be specified. + * + * Example: + * --- + * auto col = db.getCollection("mydb.mycollection"); + * --- + * + * The opIndex function should be used to get a relative collection name where the + * default database is taken into consideration. + * + * Most user code should use opIndex. + */ + MongoCollection getCollection(string db_and_col) + { + return MongoCollection(this, db_and_col); + } package auto lockConnection() { return m_connections.lockConnection(); } } \ No newline at end of file diff --git a/source/vibe/db/mongo/mongo.d b/source/vibe/db/mongo/mongo.d index 89a6ffa64c..8cc27591e8 100644 --- a/source/vibe/db/mongo/mongo.d +++ b/source/vibe/db/mongo/mongo.d @@ -25,6 +25,9 @@ import std.algorithm; --- auto db = connectMongoDB("mongodb://localhost/?slaveOk=true"); --- + + Throws: an exception if a mongodb:// URL is given and the URL cannot be parsed. + An exception will not be thrown if called with a hostname and port. */ MongoDB connectMongoDB(string host, ushort port = MongoDB.defaultPort) {