Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis connection pooling #199

Merged
merged 2 commits into from
Mar 30, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 65 additions & 30 deletions source/vibe/db/redis/redis.d
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module vibe.db.redis.redis;

public import vibe.core.net;

import vibe.core.connectionpool;
import vibe.core.log;
import vibe.stream.operations;
import std.string;
Expand Down Expand Up @@ -81,7 +82,7 @@ final class RedisReply {
}
}

final class RedisClient {
private final class RedisConnection : EventedObject {

private {
string m_host;
Expand All @@ -96,18 +97,69 @@ final class RedisClient {
m_port = port;
}

private {
ubyte[][] argsToUbyte(ARGS...)(ARGS args) {
static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
foreach( i, T; ARGS ){
static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
static assert(i % 2 != 1 || isArray!T, "Values must be arrays.");
}
ubyte[][] ret;
foreach( i, arg; args) ret ~= cast(ubyte[])arg;
return ret;
override void acquire() {
if (m_conn && m_conn.connected)
m_conn.acquire();
}

override void release() {
if (m_conn && m_conn.connected)
m_conn.release();
}

override bool isOwner() {
return m_conn ? m_conn.isOwner() : true;
}

T request(T=RedisReply)(string command, in ubyte[][] args...) {
if( !m_conn || !m_conn.connected ){
m_conn = connectTcp(m_host, m_port);
}
m_conn.write(format("*%d\r\n$%d\r\n%s\r\n", args.length + 1, command.length, command));
foreach( arg; args ) {
m_conn.write(format("$%d\r\n", arg.length));
m_conn.write(arg);
m_conn.write("\r\n");
}
auto reply = new RedisReply(m_conn);
static if( is(T == bool) ) {
return reply.next!(ubyte[])()[0] == '1';
} else static if ( is(T == int) || is(T == long) || is(T == size_t) || is(T == double) ) {
auto str = reply.next!string();
return parse!T(str);
} else static if ( is(T == string) ) {
return cast(string)reply.next!T();
} else return reply;
}
}

/** A redis client with connection pooling. */
final class RedisClient {

private ConnectionPool!RedisConnection m_connections;

this() { }

/** Initializes the connection pool. */
void connect(string host = "127.0.0.1", ushort port = 6379) {
m_connections = new ConnectionPool!RedisConnection({
auto connection = new RedisConnection;
connection.connect(host, port);
return connection;
});
}

private static ubyte[][] argsToUbyte(ARGS...)(ARGS args) {
static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
foreach( i, T; ARGS ){
static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
static assert(i % 2 != 1 || isArray!T, "Values must be arrays.");
}
ubyte[][] ret;
foreach( i, arg; args) ret ~= cast(ubyte[])arg;
return ret;
}

size_t del(string[] keys...) {
return request!size_t("DEL", cast(ubyte[][])keys);
}
Expand Down Expand Up @@ -577,23 +629,6 @@ final class RedisClient {
//TODO sync

T request(T=RedisReply)(string command, in ubyte[][] args...) {
if( !m_conn /*|| !m_conn.connected*/ ){
m_conn = connectTcp(m_host, m_port);
}
m_conn.write(format("*%d\r\n$%d\r\n%s\r\n", args.length + 1, command.length, command));
foreach( arg; args ) {
m_conn.write(format("$%d\r\n", arg.length));
m_conn.write(arg);
m_conn.write("\r\n");
}
auto reply = new RedisReply(m_conn);
static if( is(T == bool) ) {
return reply.next!(ubyte[])()[0] == '1';
} else static if ( is(T == int) || is(T == long) || is(T == size_t) || is(T == double) ) {
auto str = reply.next!string();
return parse!T(str);
} else static if ( is(T == string) ) {
return cast(string)reply.next!T();
} else return reply;
return m_connections.lockConnection().request!T(command, args);
}
}
}