Skip to content

Commit

Permalink
Implement socket barriers (shared between processes)
Browse files Browse the repository at this point in the history
When varnishtest creates a socket barrier, it will bind a socket and
listen to incoming connections. Once the number of expected connections
is open, connections are closed.

Barrier users only need to connect to the socket, read "nothing" and
block until the connection is closed. It allows virtually any process
to sync with varnishtest. The barrier will provide macros with its
socket information.
  • Loading branch information
dridi committed Mar 29, 2016
1 parent 0899861 commit db395f8
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 7 deletions.
64 changes: 64 additions & 0 deletions bin/varnishtest/tests/a00013.vtc
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
varnishtest "Barrier operations"

# same as a00008.vtc, with socket barriers instead

# bs -> server, bc -> client, bb -> both
barrier bs sock 4
barrier bc sock 4
barrier bb sock 4 -cyclic

server s1 {
rxreq
barrier bs sync
barrier bb sync
delay .9
txresp
} -start

server s2 {
rxreq
barrier bs sync
barrier bb sync
delay .6
txresp
} -start

server s3 {
rxreq
barrier bs sync
barrier bb sync
delay .2
txresp
} -start

client c1 -connect ${s1_sock} {
delay .2
txreq
rxresp
barrier bc sync
barrier bb sync
} -start

client c2 -connect ${s2_sock} {
delay .6
txreq
rxresp
barrier bc sync
barrier bb sync
} -start

client c3 -connect ${s3_sock} {
delay .9
txreq
rxresp
barrier bc sync
barrier bb sync
} -start

# Wait for all servers to have received requests
barrier bs sync
barrier bb sync

# Wait for all clients to have received responses
barrier bc sync
barrier bb sync
180 changes: 173 additions & 7 deletions bin/varnishtest/vtc_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@

#include "config.h"

#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <sys/select.h>
#include <sys/types.h>
#include <sys/socket.h>

#include "vtc.h"
#include "vtcp.h"

enum barrier_e {
BARRIER_NONE = 0,
Expand All @@ -55,6 +61,9 @@ struct barrier {
unsigned cyclic;

enum barrier_e type;
/* fields below are only for BARRIER_SOCK */
pthread_t thread;
volatile unsigned active;
};

static pthread_mutex_t barrier_mtx;
Expand Down Expand Up @@ -116,14 +125,122 @@ barrier_cond(struct barrier *b, const char *av, struct vtclog *vl)
b->type = BARRIER_COND;
}

static void *
barrier_sock_thread(void *priv)
{
struct barrier *b;
struct vtclog *vl;
struct timeval tmo;
const char *err;
char abuf[16], pbuf[6];
int i, sock, *conns;
fd_set rfds;

CAST_OBJ_NOTNULL(b, priv, BARRIER_MAGIC);
assert(b->type == BARRIER_SOCK);

AZ(pthread_mutex_lock(&b->mtx));

vl = vtc_logopen(b->name);
AN(vl);

sock = VTCP_listen_on("127.0.0.1:0", NULL, b->expected, &err);
if (sock < 0) {
pthread_cond_signal(&b->cond);
AZ(pthread_mutex_unlock(&b->mtx));
vtc_log(vl, 0, "Barrier(%s) %s fails: %s (errno=%d)",
b->name, err, strerror(errno), errno);
}
assert(sock > 0);
(void)VTCP_nonblocking(sock);
VTCP_myname(sock, abuf, sizeof abuf, pbuf, sizeof pbuf);

macro_def(vl, b->name, "addr", "%s", abuf);
macro_def(vl, b->name, "port", "%s", pbuf);
macro_def(vl, b->name, "sock", "%s:%s", abuf, pbuf);

pthread_cond_signal(&b->cond);
AZ(pthread_mutex_unlock(&b->mtx));

conns = calloc(b->expected, sizeof *conns);
AN(conns);

while (b->active) {
FD_ZERO(&rfds);
FD_SET(sock, &rfds);

tmo.tv_sec = 1;
tmo.tv_usec = 0;
i = select(sock + 1, &rfds, NULL, NULL, &tmo);
if (i == 0)
continue;
if (i < 0) {
if (errno == EINTR)
continue;
AZ(close(sock));
vtc_log(vl, 0,
"Barrier(%s) select fails: %s (errno=%d)",
b->name, strerror(errno), errno);
}
assert(i == 1);
assert(b->waiters <= b->expected);
if (b->waiters == b->expected)
vtc_log(vl, 0,
"Barrier(%s) use error: "
"more waiters than the %u expected",
b->name, b->expected);

i = accept(sock, NULL, NULL);
if (i < 0) {
AZ(close(sock));
vtc_log(vl, 0,
"Barrier(%s) accept fails: %s (errno=%d)",
b->name, strerror(errno), errno);
}

/* NB. We don't keep track of the established connections, only
* that connections were made to the barrier's socket.
*/
conns[b->waiters] = i;

if (++b->waiters < b->expected) {
vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
b->name, b->waiters, b->expected);
continue;
}

vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
for (i = 0; i < b->expected; i++)
AZ(close(conns[i]));

if (b->cyclic)
b->waiters = 0;
else
b->active = 0;
}

macro_undef(vl, b->name, "addr");
macro_undef(vl, b->name, "port");
macro_undef(vl, b->name, "sock");
AZ(close(sock));

return (NULL);
}

static void
barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
{

CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
barrier_expect(b, av, vl);
b->type = BARRIER_SOCK;
INCOMPL();
b->active = 1;

/* NB. We can use the BARRIER_COND's pthread_cond_t to wait until the
* socket is ready for convenience.
*/
AZ(pthread_create(&b->thread, NULL, barrier_sock_thread, b));
AZ(pthread_cond_wait(&b->cond, &b->mtx));
}

static void
Expand Down Expand Up @@ -174,6 +291,46 @@ barrier_cond_sync(struct barrier *b, struct vtclog *vl)
b->waiters = 0;
}

static void
barrier_sock_sync(struct barrier *b, struct vtclog *vl)
{
struct vsb *vsb;
const char *err;
char buf[32];
int i, sock;
ssize_t sz;

CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
assert(b->type == BARRIER_SOCK);

i = snprintf(buf, sizeof buf, "${%s_sock}", b->name);
assert(i > 0 && i < sizeof buf);
vsb = macro_expand(vl, buf);
vtc_log(vl, 4, "Barrier(%s) sync with socket", b->name);

sock = VTCP_open(VSB_data(vsb), NULL, 0., &err);
if (sock < 0)
vtc_log(vl, 0, "Barrier(%s) connection failed: %s",
b->name, err);

VSB_delete(vsb);

/* emulate pthread_cond_wait's behavior */
AZ(pthread_mutex_unlock(&b->mtx));
sz = read(sock, buf, sizeof buf); /* XXX loop with timeout? */
AZ(pthread_mutex_lock(&b->mtx));

i = errno;
AZ(close(sock));

if (sz < 0)
vtc_log(vl, 0, "Barrier(%s) connection failed: %s (errno=%d)",
b->name, strerror(i), i);
if (sz > 0)
vtc_log(vl, 0, "Barrier(%s) unexpected data (%ldB)",
b->name, sz);
}

static void
barrier_sync(struct barrier *b, struct vtclog *vl)
{
Expand All @@ -188,7 +345,7 @@ barrier_sync(struct barrier *b, struct vtclog *vl)
barrier_cond_sync(b, vl);
break;
case BARRIER_SOCK:
INCOMPL();
barrier_sock_sync(b, vl);
break;
default:
WRONG("Wrong barrier type");
Expand All @@ -212,11 +369,20 @@ cmd_barrier(CMD_ARGS)
/* Reset and free */
VTAILQ_FOREACH_SAFE(b, &barriers, list, b2) {
AZ(pthread_mutex_lock(&b->mtx));
assert(b->type != BARRIER_NONE);
if (b->cyclic)
AZ(b->waiters);
else
assert(b->waiters == b->expected);
switch (b->type) {
case BARRIER_COND:
if (b->cyclic)
AZ(b->waiters);
else
assert(b->waiters == b->expected);
break;
case BARRIER_SOCK:
b->active = 0;
AZ(pthread_join(b->thread, NULL));
break;
default:
WRONG("Wrong barrier type");
}
AZ(pthread_mutex_unlock(&b->mtx));
}
AZ(pthread_mutex_unlock(&barrier_mtx));
Expand Down

0 comments on commit db395f8

Please sign in to comment.