Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongodb url #81

Merged
merged 2 commits into from
Aug 1, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 177 additions & 55 deletions source/vibe/db/mongo/connection.d
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,30 @@ import std.string;
*/
class MongoConnection : EventedObject {
private {
MongoConnectionConfig config;
MongoClientSettings settings;
TcpConnection m_conn;
ulong m_bytesRead;
int m_msgid = 1;
}

this(string server, ushort port = 27017)
{
config.hosts ~= MongoHost(server, port);
settings = new MongoClientSettings();
settings.hosts ~= new MongoHost(server, port);
}

this(MongoConnectionConfig cfg)
this(MongoClientSettings cfg)
{
config = cfg;
settings = cfg;

// Now let's check for features that are not yet supported.
if(config.hosts.length > 1)
logWarn("Multiple mongodb hosts are not yet supported. Using first one: {}:{}",
config.hosts[0].name, config.hosts[0].port);
if(config.username != "")
logWarn("MongoDB username is not yet supported. Ignoring username: {}", config.username);
if(config.password != "")
if(settings.hosts.length > 1)
logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s",
settings.hosts[0].name, settings.hosts[0].port);
if(settings.username != string.init)
logWarn("MongoDB username is not yet supported. Ignoring username: %s", settings.username);
if(settings.password != string.init)
logWarn("MongoDB password is not yet supported. Ignoring password.");
if(config.database != "")
logWarn("MongoDB database is not yet supported. Ignoring database value: {}", config.database);
}

// changes the ownership of this connection
Expand All @@ -74,7 +73,7 @@ class MongoConnection : EventedObject {
* TODO: Connect to one of the specified hosts taking into consideration
* options such as connect timeouts and so on.
*/
m_conn = connectTcp(config.hosts[0].name, config.hosts[0].port);
m_conn = connectTcp(settings.hosts[0].name, settings.hosts[0].port);
m_bytesRead = 0;
}

Expand All @@ -98,6 +97,10 @@ class MongoConnection : EventedObject {
msg.addBSON(selector);
msg.addBSON(update);
send(msg);
if(settings.safe)
{
safeModeLastError(collection_name);
}
}

void insert(string collection_name, InsertFlags flags, Bson[] documents)
Expand All @@ -111,13 +114,18 @@ class MongoConnection : EventedObject {
msg.addBSON(d);
}
send(msg);

if(settings.safe)
{
safeModeLastError(collection_name);
}
}

Reply query(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector = Bson(null))
{
scope(failure) disconnect();
scope msg = new Message(OpCode.Query);
msg.addInt(flags | config.defQueryFlags);
msg.addInt(flags | settings.defQueryFlags);
msg.addCString(collection_name);
msg.addInt(nskip);
msg.addInt(nret);
Expand Down Expand Up @@ -147,6 +155,10 @@ class MongoConnection : EventedObject {
msg.addInt(flags);
msg.addBSON(selector);
send(msg);
if(settings.safe)
{
safeModeLastError(collection_name);
}
}

void killCursors(long[] cursors)
Expand Down Expand Up @@ -234,21 +246,44 @@ class MongoConnection : EventedObject {
private void recv(ubyte[] dst) { enforce(m_conn); m_conn.read(dst); m_bytesRead += dst.length; }

private int nextMessageId() { return m_msgid++; }

private Reply safeModeLastError(string collection_name)
{
Bson[string] command_and_options = ["getlasterror": Bson(1)];

if(settings.w != settings.w.init)
command_and_options["w"] = settings.w; // Already a Bson struct
if(settings.wTimeoutMS != settings.wTimeoutMS.init)
command_and_options["wtimeout"] = Bson(settings.wTimeoutMS);
if(settings.journal)
command_and_options["j"] = Bson(true);
if(settings.fsync)
command_and_options["fsync"] = Bson(true);

logTrace("Running safeModeLastError on %s", collection_name);

Reply results = query(collection_name, QueryFlags.None | settings.defQueryFlags,
0, 0, serializeToBson(command_and_options));
return results;
}
}

/**
* Parses the given string as a mongodb URL. Url must be in the form documented at
* http://www.mongodb.org/display/DOCS/Connections which is:
* $(LINK http://www.mongodb.org/display/DOCS/Connections) which is:
*
* mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
*
* Returns true if the URL was successfully parsed. Returns false if the URL can not be parsed.
* If the URL is successfully parsed the MongoConfig struct will contain the parsed data.
* If the URL is not successfully parsed the information in the MongoConfig struct may be
* Returns: true if the URL was successfully parsed. False if the URL can not be parsed.
*
* If the URL is successfully parsed the MongoClientSettings instance will contain the parsed config.
* If the URL is not successfully parsed the information in the MongoClientSettings instance may be
* incomplete and should not be used.
*/
bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url)
bool parseMongoDBUrl(out MongoClientSettings cfg, string url)
{
cfg = new MongoClientSettings();

string tmpUrl = url[0..$]; // Slice of the url (not a copy)

if(!startsWith(tmpUrl, "mongodb://"))
Expand Down Expand Up @@ -301,7 +336,7 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url)
port = to!ushort(hostPort.front);
}

cfg.hosts ~= MongoHost(host, port);
cfg.hosts ~= new MongoHost(host, port);
}
} catch ( Exception e) {
return false; // Probably failed converting the port to ushort.
Expand Down Expand Up @@ -348,29 +383,107 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url)
auto setting = to!bool(value);
if(setting) cfg.defQueryFlags |= QueryFlags.SlaveOk;
} catch (Exception e) {
logError("Value for slaveOk must be true or false but was {}", value);
logError("Value for slaveOk must be true or false but was %s", value);
}
break;

// These options aren't implemented yet so we'll warn on them.
case "replicaset":
cfg.replicaSet = value;

logWarn("MongoDB option %s not yet implemented.", option);
break;

case "safe":
try
{
cfg.safe = to!bool(value);
} catch (Exception e) {
logError("Value for safe must be true or false but was %s", value);
}
break;

case "w":
try
{
if(icmp(value, "majority") == 0)
{
cfg.w = Bson("majority");
} else {
cfg.w = Bson(to!long(value));
}
} catch (Exception e) {
logError("Invalid w value: [%s] Should be an integer number or 'majority'", value);
}

break;

case "wtimeoutms":
try
{
cfg.wTimeoutMS = to!long(value);
} catch (Exception e) {
logError("Invalid wTimeoutMS value: [%s] Should be an integer number", value);
}

break;

case "fsync":
try
{
cfg.fsync = to!bool(value);
} catch (Exception e) {
logError("Value for fsync must be true or false but was %s", value);
}

break;

case "journal":
case "connecttimeoutms":
try
{
cfg.journal = to!bool(value);
} catch (Exception e) {
logError("Value for journal must be true or false but was %s", value);
}

break;

case "connecttimeoutms":
try
{
cfg.connectTimeoutMS = to!long(value);
} catch (Exception e) {
logError("Invalid connectTimeoutMS value: [%s] Should be an integer number", value);
}

logWarn("MongoDB option %s not yet implemented.", option);
break;

case "sockettimeoutms":
logWarn("MongoDB option {} not yet implemented.", option);
try
{
cfg.socketTimeoutMS = to!long(value);
} catch (Exception e) {
logError("Invalid socketTimeoutMS value: [%s] Should be an integer number", value);
}

logWarn("MongoDB option %s not yet implemented.", option);
break;

// Catch-all
default:
logWarn("Unknown MongoDB option {}", option);
logWarn("Unknown MongoDB option %s", option);
}

// Store the options in string format in case we want them later.
cfg.options[option] = value;
}

/* Some settings imply safe. If they are set, set safe to true regardless
* of what it was set to in the URL string
*/
if( (cfg.w != Bson.init) || (cfg.wTimeoutMS != long.init) ||
cfg.journal || cfg.fsync )
{
cfg.safe = true;
}
}

return true;
Expand All @@ -379,37 +492,41 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url)
/* Test for parseMongoDBUrl */
unittest
{
MongoConnectionConfig cfg;
MongoClientSettings cfg;

assert(parseMongoDBUrl(cfg, "mongodb://localhost"));
assert(cfg.hosts.length == 1);
assert(cfg.database == "");
assert(cfg.options.length == 0);
assert(cfg.hosts[0].name == "localhost");
assert(cfg.hosts[0].port == 27017);
assert(cfg.defQueryFlags == QueryFlags.None);
assert(cfg.replicaSet == "");
assert(cfg.safe == false);
assert(cfg.w == Bson.init);
assert(cfg.wTimeoutMS == long.init);
assert(cfg.fsync == false);
assert(cfg.journal == false);
assert(cfg.connectTimeoutMS == long.init);
assert(cfg.socketTimeoutMS == long.init);

cfg = MongoConnectionConfig.init;
cfg = MongoClientSettings.init;
assert(parseMongoDBUrl(cfg, "mongodb://fred:foobar@localhost"));
assert(cfg.username == "fred");
assert(cfg.password == "foobar");
assert(cfg.hosts.length == 1);
assert(cfg.database == "");
assert(cfg.options.length == 0);
assert(cfg.hosts[0].name == "localhost");
assert(cfg.hosts[0].port == 27017);

cfg = MongoConnectionConfig.init;
cfg = MongoClientSettings.init;
assert(parseMongoDBUrl(cfg, "mongodb://fred:@localhost/baz"));
assert(cfg.username == "fred");
assert(cfg.password == "");
assert(cfg.database == "baz");
assert(cfg.hosts.length == 1);
assert(cfg.hosts[0].name == "localhost");
assert(cfg.hosts[0].port == 27017);
assert(cfg.options.length == 0);
assert(cfg.defQueryFlags == QueryFlags.None);

cfg = MongoConnectionConfig.init;
cfg = MongoClientSettings.init;
assert(parseMongoDBUrl(cfg, "mongodb://host1,host2,host3/?safe=true&w=2&wtimeoutMS=2000&slaveOk=false"));
assert(cfg.username == "");
assert(cfg.password == "");
Expand All @@ -421,17 +538,15 @@ unittest
assert(cfg.hosts[1].port == 27017);
assert(cfg.hosts[2].name == "host3");
assert(cfg.hosts[2].port == 27017);
assert(cfg.options.length == 3);
assert(cfg.options["safe"] == "true");
assert(cfg.options["w"] == "2");
assert(cfg.options["wtimeoutms"] == "2000");
assert(cfg.options["slaveok"] == "false");
assert(cfg.defQueryFlags == QueryFlags.None);
assert(cfg.safe == true);
assert(cfg.w == Bson(2));
assert(cfg.wTimeoutMS == 2000);
assert(cfg.defQueryFlags == QueryFlags.SlaveOk);

cfg = MongoConnectionConfig.init;
cfg = MongoClientSettings.init;
assert(parseMongoDBUrl(cfg,
"mongodb://fred:[email protected],host2.other.example.com:27108,host3:"
"27019/mydb?safe=true;w=2;wtimeoutMS=2000;slaveok=true"));
"27019/mydb?journal=true;fsync=true;connectMS=1500;sockettimeoutMs=1000;w=majority"));
assert(cfg.username == "fred");
assert(cfg.password == "flinstone");
assert(cfg.database == "mydb");
Expand All @@ -441,16 +556,16 @@ unittest
assert(cfg.hosts[1].name == "host2.other.example.com");
assert(cfg.hosts[1].port == 27108);
assert(cfg.hosts[2].name == "host3");
assert(cfg.hosts[2].port == 27019);
assert(cfg.options.length == 3);
assert(cfg.options["safe"] == "true");
assert(cfg.options["w"] == "2");
assert(cfg.options["wtimeoutms"] == "2000");
assert(cfg.options["slaveok"] == "true");
assert(cfg.defQueryFlags & QueryFlags.SlaveOk);
assert(cfg.hosts[2].port == 27019);assert(cfg.fsync == false);
assert(cfg.fsync == true);
assert(cfg.journal == true);
assert(cfg.connectTimeoutMS == 1500);
assert(cfg.socketTimeoutMS == 1000);
assert(cfg.w == Bson("majority"));
assert(cfg.safe == true);

// Invalid URLs - these should fail to parse
cfg = MongoConnectionConfig.init;
cfg = MongoClientSettings.init;
assert(! (parseMongoDBUrl(cfg, "localhost:27018")));
assert(! (parseMongoDBUrl(cfg, "http://blah")));
assert(! (parseMongoDBUrl(cfg, "mongodb://@localhost")));
Expand Down Expand Up @@ -531,17 +646,24 @@ private class Message {
void addBSON(Bson v) { m_data.put(v.data); }
}

struct MongoConnectionConfig
class MongoClientSettings
{
string username;
string password;
MongoHost[] hosts;
string database;
string[string] options;
QueryFlags defQueryFlags = QueryFlags.None;
string replicaSet;
bool safe;
Bson w; // Either a number or the string 'majority'
long wTimeoutMS;
bool fsync;
bool journal;
long connectTimeoutMS;
long socketTimeoutMS;
}

struct MongoHost
class MongoHost
{
string name;
ushort port;
Expand Down
Loading