Skip to content

Commit

Permalink
redis: Add -async support
Browse files Browse the repository at this point in the history
Supports communication with redis as part of an event loop

Signed-off-by: Steve Bennett <[email protected]>
  • Loading branch information
msteveb committed Feb 13, 2023
1 parent a5ea6b0 commit aa18a0d
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 5 deletions.
30 changes: 30 additions & 0 deletions README.redis
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ If no message is received, the read command will wait forever.

The message is returned as: message <channel> <text>

The 'read' subcommand is also used in non-blocking mode. See the section below for more details.

The readable subcommand
~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -133,3 +135,31 @@ The 'close' command is supported to close the connection.
This command is equivalent to deleting the command with:

rename $r ""

Async/Non-blocking support
~~~~~~~~~~~~~~~~~~~~~~~~~~

It is possible to connect to redis in non-blocking mode by using the '-async' flag. e.g.

set r [redis -async [socket stream localhost:6379]]

Now commands will return immediately with an empty result and 'read' in a 'readable' should
be used to retrieve the result. As a simple example:

$r readable {
set result [$r read]
if {$result ne ""} {
puts $result
incr next
}
}
$r SET x 5
vwait next
$r INCR x
vwait next

Note that if a large result is returned, 'read' may return an empty string, in
which case further calls to 'readable' are required to return the result.

In general the underlying socket should be put into non-blocking mode ($sock ndelay 1)
and a while loop should be used to read reponses until and empty result is returned.
62 changes: 62 additions & 0 deletions examples/redis-async.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env jimsh

# Testing redis client access in non-blocking mode

# Requires the redis extension
package require redis

# A redis server should be running either on localhost 6379
# or on the given address (e.g. host:port)
try {
lassign $argv addr
if {$addr eq ""} {
set addr localhost:6379
}
set s [socket stream $addr]
# socket must be in non-blocking mode
$s ndelay 1
set r [redis -async $s]
} on error msg {
puts [errorInfo $msg]
exit 1
}

# List of outstanding redis commands
set cmds {}

$r readable {
while {1} {
set result [$r -type read]
if {$result eq ""} {
break
}
set cmds [lassign $cmds cmd]
# Show command and response
puts "$cmd => $result"
}
}

# queue a command and remember it
proc redis_command {r args} {
global cmds
lappend cmds $args
$r {*}$args
}

redis_command $r SET zz 0

proc periodic {r} {
global counter done

if {[incr counter] > 10} {
incr done
} else {
redis_command $r INCR zz
after 100 periodic $r
}
}

set counter 0
periodic $r

vwait done
44 changes: 39 additions & 5 deletions jim-redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ static Jim_Obj *jim_redis_get_result(Jim_Interp *interp, redisReply *reply, int
return obj;
}

static int jim_redis_write_callback(Jim_Interp *interp, void *clientData, int mask)
{
redisContext *c = clientData;

int done;
if (redisBufferWrite(c, &done) != REDIS_OK) {
return JIM_ERR;
}
if (done) {
/* Write has completed, so remove the callback */
Jim_DeleteFileHandler(interp, c->fd, mask);
}
return JIM_OK;
}

/**
* $r readable ?script?
* - set or clear a readable script
Expand Down Expand Up @@ -111,7 +126,12 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
return Jim_DeleteCommand(interp, argv[0]);
}
if (Jim_CompareStringImmediate(interp, argv[1], "read")) {
if (redisGetReply(c, (void **)&reply) != REDIS_OK) {
int rc;
if (!(c->flags & REDIS_BLOCK)) {
redisBufferRead(c);
}
rc = redisGetReply(c, (void **)&reply);
if (rc != REDIS_OK) {
reply = NULL;
}
}
Expand All @@ -126,6 +146,13 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
reply = redisCommandArgv(c, nargs, args, arglens);
Jim_Free(args);
Jim_Free(arglens);
if (!(c->flags & REDIS_BLOCK)) {
int done;
if (redisBufferWrite(c, &done) == REDIS_OK && !done) {
/* Couldn't write the entire command, so set up a writable callback to complete the job */
Jim_CreateFileHandler(interp, c->fd, JIM_EVENT_WRITABLE, jim_redis_write_callback, c, NULL);
}
}
}
/* sometimes commands return NULL */
if (reply) {
Expand Down Expand Up @@ -164,14 +191,18 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
Jim_Obj *objv[2];
long fd;
int ret;
int async = 0;

if (argc != 2) {
Jim_WrongNumArgs(interp, 1, argv, "socket-stream");
if (argc > 2 && Jim_CompareStringImmediate(interp, argv[1], "-async")) {
async = 1;
}
if (argc - async != 2) {
Jim_WrongNumArgs(interp, 1, argv, "?-async? socket-stream");
return JIM_ERR;
}

/* Invoke getfd to get the file descriptor */
objv[0] = argv[1];
objv[0] = argv[1 + async];
objv[1] = Jim_NewStringObj(interp, "getfd", -1);
ret = Jim_EvalObjVector(interp, 2, objv);
if (ret == JIM_OK) {
Expand All @@ -186,10 +217,13 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
fd = dup(fd);
/* Can't fail */
c = redisConnectFd(fd);
if (async) {
c->flags &= ~REDIS_BLOCK;
}
/* Enable TCP_KEEPALIVE - this is the default for later redis versions */
redisEnableKeepAlive(c);
/* Now delete the original stream */
Jim_DeleteCommand(interp, argv[1]);
Jim_DeleteCommand(interp, argv[1 + async]);
snprintf(buf, sizeof(buf), "redis.handle%ld", Jim_GetId(interp));
Jim_CreateCommand(interp, buf, jim_redis_subcmd, c, jim_redis_del_proc);

Expand Down

0 comments on commit aa18a0d

Please sign in to comment.