Skip to content

Commit

Permalink
set read timeout to 5 sec (was 100ms and causing intermittent failure…
Browse files Browse the repository at this point in the history
…s when the machine is very busy)
  • Loading branch information
cktan committed Nov 21, 2019
1 parent 8c31ea4 commit ab5c6a7
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 58 deletions.
64 changes: 30 additions & 34 deletions client/c/s3pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,10 @@ static int check_reply(char* reply, char* errmsg, int errmsgsz)


static char* chat(int port, const char* request,
char* errmsg, int errmsgsz,
FILE* trace)
char* errmsg, int errmsgsz)
{
int sockfd = -1;
struct sockaddr_in servaddr;
int replysz = 0;
char* reply = 0;

// socket create and verification
Expand Down Expand Up @@ -148,49 +146,51 @@ static char* chat(int port, const char* request,
}

// read the reply
char* p = reply;
char* q = reply + replysz;
int top, max;
top = max = 0;
while (1) {

// always keep one extra byte slack for NUL term
if (p + 1 >= q) {
int newsz = replysz * 1.5;
if (newsz == 0) newsz = 10;

if (top == max) {
int newsz = max * 1.5;
if (newsz == 0) newsz = 1024;
char* t = realloc(reply, newsz);
if (!t) {
snprintf(errmsg, errmsgsz, "s3pool read: reply message too big -- out of memory");
goto bailout;
}
int n = p - reply;
reply = t;
replysz = newsz;
p = t + n;
q = reply + replysz;
max = newsz;
}

assert(p + 1 < q);
int n = read(sockfd, p, q - p - 1);
int n = read(sockfd, reply + top, max - top);
if (n == -1) {
if (errno == EAGAIN) continue;

snprintf(errmsg, errmsgsz, "s3pool read: %s", strerror(errno));
goto bailout;
}
p += n;
*p = 0; /* NUL */

if (trace) {
fprintf(trace, "\nNNN: %s\n", p - n);
}
top += n;
if (n == 0) break;
}
if (top == max) {
char* t = realloc(reply, max + 1);
if (!t) {
snprintf(errmsg, errmsgsz, "s3pool read: reply message too big -- out of memory");
goto bailout;
}
reply = t;
max = max + 1;
}
reply[top++] = 0; /* NUL */

close(sockfd);

if (trace) {
fprintf(trace, "\nSEND: %s\nRECEIVED: %s\n", request, reply);
if (top == 1) {
snprintf(errmsg, errmsgsz, "s3pool read: 0 bytes from server");
goto bailout;
}

close(sockfd);
sockfd = -1;

if (-1 == check_reply(reply, errmsg, errmsgsz)) {
goto bailout;
}
Expand Down Expand Up @@ -247,11 +247,7 @@ char* s3pool_pull_ex(int port, const char* bucket,
goto bailout;
}

char fname[100];
sprintf(fname, "/tmp/s3pool.%d.log", getpid());
FILE* fp = fopen(fname, "a");
reply = chat(port, request, errmsg, errmsgsz, fp);
fclose(fp);
reply = chat(port, request, errmsg, errmsgsz);
if (! reply) {
goto bailout;
}
Expand Down Expand Up @@ -293,7 +289,7 @@ int s3pool_push(int port, const char* bucket, const char* key, const char* fpath
goto bailout;
}

reply = chat(port, request, errmsg, errmsgsz, 0);
reply = chat(port, request, errmsg, errmsgsz);
if (! reply) {
goto bailout;
}
Expand Down Expand Up @@ -324,7 +320,7 @@ int s3pool_refresh(int port, const char* bucket,
goto bailout;
}

reply = chat(port, request, errmsg, errmsgsz, 0);
reply = chat(port, request, errmsg, errmsgsz);
if (! reply) {
goto bailout;
}
Expand Down Expand Up @@ -361,7 +357,7 @@ char* s3pool_glob(int port, const char* bucket, const char* pattern,
if (!request) {
goto bailout;
}
reply = chat(port, request, errmsg, errmsgsz, 0);
reply = chat(port, request, errmsg, errmsgsz);
if (! reply) {
goto bailout;
}
Expand Down
5 changes: 5 additions & 0 deletions src/s3pool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func checkdirs() {
// Callback function for each new request
func serve(c *tcp_server.Client, request string) {

if request == "" {
log.Println("Empty request or timed out reading request")
return
}

sendReply := func(status, reply string, elapsed int) {
// send network reply
c.Send(status)
Expand Down
46 changes: 28 additions & 18 deletions src/s3pool/op/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ import (

const _MAXWORKER = 20

/**
* Process N items using M go routines
*/
/**
* Process N items using M go routines
*/
func Pmap(processItem func(n int), N int, M int) {
if (M > N) {
if M > N {
M = N
}

Expand All @@ -39,17 +36,13 @@ func Pmap(processItem func(n int), N int, M int) {
} else {
fin = make(chan int, 10)
ticket = make(chan int, 10)
}
defer func() {
close(fin)
close(ticket)
}()
}

// let M go routines run concurrently
// let M workers run concurrently
for i := 0; i < M; i++ {
go func() {
for {
idx := <- ticket
idx := <-ticket
if idx == -1 {
return
}
Expand All @@ -59,24 +52,41 @@ func Pmap(processItem func(n int), N int, M int) {
}()
}

// send the jobs. Do this in a go routine so we don't have a
// race between ticket and fin
go func() {
// send the jobs
for i := 0; i < N; i++ {
ticket <- i
}
// send the terminate signal
for i := 0; i < M; i++ {
ticket <- -1
}
}()

// wait for all jobs to finish
for i := 0; i < N; i++ {
<-fin
}

}
// Go Channel Closing Principle:
// - Don't close a channel from the receiver side and
// - Don't close a channel if the channel has multiple concurrent senders.

// At this point, we gave out N tickets, and we received N fins.
// All workers must be blocked waiting for ticket at this time.
// They will next get the term signal and exit.

// Even though we are receiver on fin, we can close it making sure
// sure that no one will ever send to it again.
close(fin)

// Send the terminate signal. each worker will get the term
// signal and MUST NOT send to fin (it was closed above).
for i := 0; i < M; i++ {
ticket <- -1
}

// we are the only one sending on ticket, so it is always safe
// for us to close ticket.
close(ticket)
}

func Pull(args []string) (string, error) {
if len(args) < 2 {
Expand Down
9 changes: 3 additions & 6 deletions src/s3pool/tcp_server/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ type server struct {

// Read client data from channel
func (c *Client) accepted() {
defer c.conn.Close()
c.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
c.conn.SetReadDeadline(time.Now().Add(5 * time.Second))
reader := bufio.NewReader(c.conn)
req, _ := reader.ReadString('\n')
req = strings.Trim(req, " \n\t\r")
// ignore empty request
if req != "" {
c.Server.callback(c, req)
}
c.Server.callback(c, req)
c.conn.Close()
}

// Send text message to client
Expand Down

0 comments on commit ab5c6a7

Please sign in to comment.