-
Notifications
You must be signed in to change notification settings - Fork 284
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #81 from davideagen/mongodb_url
Mongodb url
- Loading branch information
Showing
3 changed files
with
240 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,31 +26,30 @@ import std.string; | |
*/ | ||
class MongoConnection : EventedObject { | ||
private { | ||
MongoConnectionConfig config; | ||
MongoClientSettings settings; | ||
TcpConnection m_conn; | ||
ulong m_bytesRead; | ||
int m_msgid = 1; | ||
} | ||
|
||
this(string server, ushort port = 27017) | ||
{ | ||
config.hosts ~= MongoHost(server, port); | ||
settings = new MongoClientSettings(); | ||
settings.hosts ~= new MongoHost(server, port); | ||
} | ||
|
||
this(MongoConnectionConfig cfg) | ||
this(MongoClientSettings cfg) | ||
{ | ||
config = cfg; | ||
settings = cfg; | ||
|
||
// Now let's check for features that are not yet supported. | ||
if(config.hosts.length > 1) | ||
logWarn("Multiple mongodb hosts are not yet supported. Using first one: {}:{}", | ||
config.hosts[0].name, config.hosts[0].port); | ||
if(config.username != "") | ||
logWarn("MongoDB username is not yet supported. Ignoring username: {}", config.username); | ||
if(config.password != "") | ||
if(settings.hosts.length > 1) | ||
logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s", | ||
settings.hosts[0].name, settings.hosts[0].port); | ||
if(settings.username != string.init) | ||
logWarn("MongoDB username is not yet supported. Ignoring username: %s", settings.username); | ||
if(settings.password != string.init) | ||
logWarn("MongoDB password is not yet supported. Ignoring password."); | ||
if(config.database != "") | ||
logWarn("MongoDB database is not yet supported. Ignoring database value: {}", config.database); | ||
} | ||
|
||
// changes the ownership of this connection | ||
|
@@ -74,7 +73,7 @@ class MongoConnection : EventedObject { | |
* TODO: Connect to one of the specified hosts taking into consideration | ||
* options such as connect timeouts and so on. | ||
*/ | ||
m_conn = connectTcp(config.hosts[0].name, config.hosts[0].port); | ||
m_conn = connectTcp(settings.hosts[0].name, settings.hosts[0].port); | ||
m_bytesRead = 0; | ||
} | ||
|
||
|
@@ -98,6 +97,10 @@ class MongoConnection : EventedObject { | |
msg.addBSON(selector); | ||
msg.addBSON(update); | ||
send(msg); | ||
if(settings.safe) | ||
{ | ||
safeModeLastError(collection_name); | ||
} | ||
} | ||
|
||
void insert(string collection_name, InsertFlags flags, Bson[] documents) | ||
|
@@ -111,13 +114,18 @@ class MongoConnection : EventedObject { | |
msg.addBSON(d); | ||
} | ||
send(msg); | ||
|
||
if(settings.safe) | ||
{ | ||
safeModeLastError(collection_name); | ||
} | ||
} | ||
|
||
Reply query(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector = Bson(null)) | ||
{ | ||
scope(failure) disconnect(); | ||
scope msg = new Message(OpCode.Query); | ||
msg.addInt(flags | config.defQueryFlags); | ||
msg.addInt(flags | settings.defQueryFlags); | ||
msg.addCString(collection_name); | ||
msg.addInt(nskip); | ||
msg.addInt(nret); | ||
|
@@ -147,6 +155,10 @@ class MongoConnection : EventedObject { | |
msg.addInt(flags); | ||
msg.addBSON(selector); | ||
send(msg); | ||
if(settings.safe) | ||
{ | ||
safeModeLastError(collection_name); | ||
} | ||
} | ||
|
||
void killCursors(long[] cursors) | ||
|
@@ -234,21 +246,44 @@ class MongoConnection : EventedObject { | |
private void recv(ubyte[] dst) { enforce(m_conn); m_conn.read(dst); m_bytesRead += dst.length; } | ||
|
||
private int nextMessageId() { return m_msgid++; } | ||
|
||
private Reply safeModeLastError(string collection_name) | ||
{ | ||
Bson[string] command_and_options = ["getlasterror": Bson(1)]; | ||
|
||
if(settings.w != settings.w.init) | ||
command_and_options["w"] = settings.w; // Already a Bson struct | ||
if(settings.wTimeoutMS != settings.wTimeoutMS.init) | ||
command_and_options["wtimeout"] = Bson(settings.wTimeoutMS); | ||
if(settings.journal) | ||
command_and_options["j"] = Bson(true); | ||
if(settings.fsync) | ||
command_and_options["fsync"] = Bson(true); | ||
|
||
logTrace("Running safeModeLastError on %s", collection_name); | ||
|
||
Reply results = query(collection_name, QueryFlags.None | settings.defQueryFlags, | ||
0, 0, serializeToBson(command_and_options)); | ||
return results; | ||
} | ||
} | ||
|
||
/** | ||
* Parses the given string as a mongodb URL. Url must be in the form documented at | ||
* http://www.mongodb.org/display/DOCS/Connections which is: | ||
* $(LINK http://www.mongodb.org/display/DOCS/Connections) which is: | ||
* | ||
* mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]] | ||
* | ||
* Returns true if the URL was successfully parsed. Returns false if the URL can not be parsed. | ||
* If the URL is successfully parsed the MongoConfig struct will contain the parsed data. | ||
* If the URL is not successfully parsed the information in the MongoConfig struct may be | ||
* Returns: true if the URL was successfully parsed. False if the URL can not be parsed. | ||
* | ||
* If the URL is successfully parsed the MongoClientSettings instance will contain the parsed config. | ||
* If the URL is not successfully parsed the information in the MongoClientSettings instance may be | ||
* incomplete and should not be used. | ||
*/ | ||
bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) | ||
bool parseMongoDBUrl(out MongoClientSettings cfg, string url) | ||
{ | ||
cfg = new MongoClientSettings(); | ||
|
||
string tmpUrl = url[0..$]; // Slice of the url (not a copy) | ||
|
||
if(!startsWith(tmpUrl, "mongodb://")) | ||
|
@@ -301,7 +336,7 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) | |
port = to!ushort(hostPort.front); | ||
} | ||
|
||
cfg.hosts ~= MongoHost(host, port); | ||
cfg.hosts ~= new MongoHost(host, port); | ||
} | ||
} catch ( Exception e) { | ||
return false; // Probably failed converting the port to ushort. | ||
|
@@ -348,29 +383,107 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) | |
auto setting = to!bool(value); | ||
if(setting) cfg.defQueryFlags |= QueryFlags.SlaveOk; | ||
} catch (Exception e) { | ||
logError("Value for slaveOk must be true or false but was {}", value); | ||
logError("Value for slaveOk must be true or false but was %s", value); | ||
} | ||
break; | ||
|
||
// These options aren't implemented yet so we'll warn on them. | ||
case "replicaset": | ||
cfg.replicaSet = value; | ||
|
||
logWarn("MongoDB option %s not yet implemented.", option); | ||
break; | ||
|
||
case "safe": | ||
try | ||
{ | ||
cfg.safe = to!bool(value); | ||
} catch (Exception e) { | ||
logError("Value for safe must be true or false but was %s", value); | ||
} | ||
break; | ||
|
||
case "w": | ||
try | ||
{ | ||
if(icmp(value, "majority") == 0) | ||
{ | ||
cfg.w = Bson("majority"); | ||
} else { | ||
cfg.w = Bson(to!long(value)); | ||
} | ||
} catch (Exception e) { | ||
logError("Invalid w value: [%s] Should be an integer number or 'majority'", value); | ||
} | ||
|
||
break; | ||
|
||
case "wtimeoutms": | ||
try | ||
{ | ||
cfg.wTimeoutMS = to!long(value); | ||
} catch (Exception e) { | ||
logError("Invalid wTimeoutMS value: [%s] Should be an integer number", value); | ||
} | ||
|
||
break; | ||
|
||
case "fsync": | ||
try | ||
{ | ||
cfg.fsync = to!bool(value); | ||
} catch (Exception e) { | ||
logError("Value for fsync must be true or false but was %s", value); | ||
} | ||
|
||
break; | ||
|
||
case "journal": | ||
case "connecttimeoutms": | ||
try | ||
{ | ||
cfg.journal = to!bool(value); | ||
} catch (Exception e) { | ||
logError("Value for journal must be true or false but was %s", value); | ||
} | ||
|
||
break; | ||
|
||
case "connecttimeoutms": | ||
try | ||
{ | ||
cfg.connectTimeoutMS = to!long(value); | ||
} catch (Exception e) { | ||
logError("Invalid connectTimeoutMS value: [%s] Should be an integer number", value); | ||
} | ||
|
||
logWarn("MongoDB option %s not yet implemented.", option); | ||
break; | ||
|
||
case "sockettimeoutms": | ||
logWarn("MongoDB option {} not yet implemented.", option); | ||
try | ||
{ | ||
cfg.socketTimeoutMS = to!long(value); | ||
} catch (Exception e) { | ||
logError("Invalid socketTimeoutMS value: [%s] Should be an integer number", value); | ||
} | ||
|
||
logWarn("MongoDB option %s not yet implemented.", option); | ||
break; | ||
|
||
// Catch-all | ||
default: | ||
logWarn("Unknown MongoDB option {}", option); | ||
logWarn("Unknown MongoDB option %s", option); | ||
} | ||
|
||
// Store the options in string format in case we want them later. | ||
cfg.options[option] = value; | ||
} | ||
|
||
/* Some settings imply safe. If they are set, set safe to true regardless | ||
* of what it was set to in the URL string | ||
*/ | ||
if( (cfg.w != Bson.init) || (cfg.wTimeoutMS != long.init) || | ||
cfg.journal || cfg.fsync ) | ||
{ | ||
cfg.safe = true; | ||
} | ||
} | ||
|
||
return true; | ||
|
@@ -379,37 +492,41 @@ bool parseMongoDBUrl(out MongoConnectionConfig cfg, string url) | |
/* Test for parseMongoDBUrl */ | ||
unittest | ||
{ | ||
MongoConnectionConfig cfg; | ||
MongoClientSettings cfg; | ||
|
||
assert(parseMongoDBUrl(cfg, "mongodb://localhost")); | ||
assert(cfg.hosts.length == 1); | ||
assert(cfg.database == ""); | ||
assert(cfg.options.length == 0); | ||
assert(cfg.hosts[0].name == "localhost"); | ||
assert(cfg.hosts[0].port == 27017); | ||
assert(cfg.defQueryFlags == QueryFlags.None); | ||
assert(cfg.replicaSet == ""); | ||
assert(cfg.safe == false); | ||
assert(cfg.w == Bson.init); | ||
assert(cfg.wTimeoutMS == long.init); | ||
assert(cfg.fsync == false); | ||
assert(cfg.journal == false); | ||
assert(cfg.connectTimeoutMS == long.init); | ||
assert(cfg.socketTimeoutMS == long.init); | ||
|
||
cfg = MongoConnectionConfig.init; | ||
cfg = MongoClientSettings.init; | ||
assert(parseMongoDBUrl(cfg, "mongodb://fred:foobar@localhost")); | ||
assert(cfg.username == "fred"); | ||
assert(cfg.password == "foobar"); | ||
assert(cfg.hosts.length == 1); | ||
assert(cfg.database == ""); | ||
assert(cfg.options.length == 0); | ||
assert(cfg.hosts[0].name == "localhost"); | ||
assert(cfg.hosts[0].port == 27017); | ||
|
||
cfg = MongoConnectionConfig.init; | ||
cfg = MongoClientSettings.init; | ||
assert(parseMongoDBUrl(cfg, "mongodb://fred:@localhost/baz")); | ||
assert(cfg.username == "fred"); | ||
assert(cfg.password == ""); | ||
assert(cfg.database == "baz"); | ||
assert(cfg.hosts.length == 1); | ||
assert(cfg.hosts[0].name == "localhost"); | ||
assert(cfg.hosts[0].port == 27017); | ||
assert(cfg.options.length == 0); | ||
assert(cfg.defQueryFlags == QueryFlags.None); | ||
|
||
cfg = MongoConnectionConfig.init; | ||
cfg = MongoClientSettings.init; | ||
assert(parseMongoDBUrl(cfg, "mongodb://host1,host2,host3/?safe=true&w=2&wtimeoutMS=2000&slaveOk=false")); | ||
assert(cfg.username == ""); | ||
assert(cfg.password == ""); | ||
|
@@ -421,17 +538,15 @@ unittest | |
assert(cfg.hosts[1].port == 27017); | ||
assert(cfg.hosts[2].name == "host3"); | ||
assert(cfg.hosts[2].port == 27017); | ||
assert(cfg.options.length == 3); | ||
assert(cfg.options["safe"] == "true"); | ||
assert(cfg.options["w"] == "2"); | ||
assert(cfg.options["wtimeoutms"] == "2000"); | ||
assert(cfg.options["slaveok"] == "false"); | ||
assert(cfg.defQueryFlags == QueryFlags.None); | ||
assert(cfg.safe == true); | ||
assert(cfg.w == Bson(2)); | ||
assert(cfg.wTimeoutMS == 2000); | ||
assert(cfg.defQueryFlags == QueryFlags.SlaveOk); | ||
|
||
cfg = MongoConnectionConfig.init; | ||
cfg = MongoClientSettings.init; | ||
assert(parseMongoDBUrl(cfg, | ||
"mongodb://fred:[email protected],host2.other.example.com:27108,host3:" | ||
"27019/mydb?safe=true;w=2;wtimeoutMS=2000;slaveok=true")); | ||
"27019/mydb?journal=true;fsync=true;connectMS=1500;sockettimeoutMs=1000;w=majority")); | ||
assert(cfg.username == "fred"); | ||
assert(cfg.password == "flinstone"); | ||
assert(cfg.database == "mydb"); | ||
|
@@ -441,16 +556,16 @@ unittest | |
assert(cfg.hosts[1].name == "host2.other.example.com"); | ||
assert(cfg.hosts[1].port == 27108); | ||
assert(cfg.hosts[2].name == "host3"); | ||
assert(cfg.hosts[2].port == 27019); | ||
assert(cfg.options.length == 3); | ||
assert(cfg.options["safe"] == "true"); | ||
assert(cfg.options["w"] == "2"); | ||
assert(cfg.options["wtimeoutms"] == "2000"); | ||
assert(cfg.options["slaveok"] == "true"); | ||
assert(cfg.defQueryFlags & QueryFlags.SlaveOk); | ||
assert(cfg.hosts[2].port == 27019);assert(cfg.fsync == false); | ||
assert(cfg.fsync == true); | ||
assert(cfg.journal == true); | ||
assert(cfg.connectTimeoutMS == 1500); | ||
assert(cfg.socketTimeoutMS == 1000); | ||
assert(cfg.w == Bson("majority")); | ||
assert(cfg.safe == true); | ||
|
||
// Invalid URLs - these should fail to parse | ||
cfg = MongoConnectionConfig.init; | ||
cfg = MongoClientSettings.init; | ||
assert(! (parseMongoDBUrl(cfg, "localhost:27018"))); | ||
assert(! (parseMongoDBUrl(cfg, "http://blah"))); | ||
assert(! (parseMongoDBUrl(cfg, "mongodb://@localhost"))); | ||
|
@@ -531,17 +646,24 @@ private class Message { | |
void addBSON(Bson v) { m_data.put(v.data); } | ||
} | ||
|
||
struct MongoConnectionConfig | ||
class MongoClientSettings | ||
{ | ||
string username; | ||
string password; | ||
MongoHost[] hosts; | ||
string database; | ||
string[string] options; | ||
QueryFlags defQueryFlags = QueryFlags.None; | ||
string replicaSet; | ||
bool safe; | ||
Bson w; // Either a number or the string 'majority' | ||
long wTimeoutMS; | ||
bool fsync; | ||
bool journal; | ||
long connectTimeoutMS; | ||
long socketTimeoutMS; | ||
} | ||
|
||
struct MongoHost | ||
class MongoHost | ||
{ | ||
string name; | ||
ushort port; | ||
|
Oops, something went wrong.