Skip to content

Commit

Permalink
Merge pull request #3 from Microsoft/nicr/fixes
Browse files Browse the repository at this point in the history
Nicr/fixes
  • Loading branch information
nriesland committed Apr 28, 2016
2 parents 6c9e1fa + f808afb commit 4f6df66
Showing 1 changed file with 58 additions and 71 deletions.
129 changes: 58 additions & 71 deletions Rserve/src/Rserv.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ typedef unsigned long rlen_t;
#endif
#endif

#define DEFAULT_MAX_CLIENTS 16
#define DEFAULT_MAX_CLIENTS 256
/* we have no configure for Win32 so we have to take care of socklen_t */
#ifdef Win32
#define WIN32_LEAN_AND_MEAN
Expand Down Expand Up @@ -419,7 +419,9 @@ wfork(int socket, char* parentCmdLine, int idx)
return -1;
}

return (int) (winPI[idx]).hProcess;
printf("create handles... Process = %d Thread = %d Socket = %d\n", (int)winPI[idx].hProcess, (int)winPI[idx].hThread, (int)winSocks[idx]);

return (int)(winPI[idx]).hProcess;
}

BOOL WINAPI ConsoleHandler(DWORD CEvent)
Expand Down Expand Up @@ -1343,7 +1345,7 @@ static int localonly = 1;

/* server socket */
static SOCKET ss;
static SOCKET cs;
//static SOCKET cs;

/* arguments structure passed to a working thread */
struct args {
Expand Down Expand Up @@ -3022,14 +3024,26 @@ void startThread(int connfd) {
}
#endif

int getIpAddress(int newfd) {
struct sockaddr_in addr;
socklen_t addr_size = sizeof(struct sockaddr_in);
int res = getpeername(newfd, (struct sockaddr *)&addr, &addr_size);
char clientip[20];
strcpy(clientip, inet_ntoa(addr.sin_addr));
if( strcmp(clientip,"127.0.0.1") == 0)
return 1;
return 0;
}

void serverLoop() {
SOCKET cs;
#ifdef unix
int iret1;
#endif
SAIN ssa,cssa;
socklen_t al;
socklen_t al,cl;
int reuse;
struct args *sa;
struct args *sa,*ca;
struct sockaddr_in lsa;
int connfd = 0;
struct timeval timv;
Expand Down Expand Up @@ -3115,12 +3129,21 @@ void serverLoop() {
while(active) { /* main serving loop */
#ifdef Win32
nc = nextAvailableChild();
if(nc < 0) {
// The maximum number of child processes are engaged. Sleep to avoid
// spinning and then check for workers that have finished.
Sleep(500);
goto wait;
}
// Sleep to avoid spinning and then check for workers that have finished.
Sleep(100);
for (int jj = 0; jj<MAX_CLIENTS; ++jj){
if (winPI[jj].hProcess > 0) {
w = WaitForSingleObject(winPI[jj].hProcess, 0);
if (w == WAIT_OBJECT_0) {
printf("close handles... w = %d Process = %d Thread = %d Socket = %d\n", w, (int)winPI[jj].hProcess, (int)winPI[jj].hThread, (int)winSocks[jj]);
closesocket(winSocks[jj]);
winSocks[jj] = INVALID_SOCKET;
CloseHandle(winPI[jj].hProcess);
CloseHandle(winPI[jj].hThread);
winPI[jj].hProcess = 0;
}
}
}
#endif
#ifdef unix
// int maxfd = ss;
Expand Down Expand Up @@ -3297,83 +3320,46 @@ void serverLoop() {
} else
cp = cp->next;
}
} else if (selRet > 0 && FD_ISSET(cs,&readfds)) {
ca=(struct args*)malloc(sizeof(struct args));
memset(ca,0,sizeof(struct args));
al=sizeof(ca->sa);
#ifdef unix
if (localSocketName) {
cl=sizeof(ca->su);
ca->s=CF("accept",accept(cs,(SA*)&(ca->su),&cl));
} else
#endif
ca->s=CF("accept",(int)accept(cs,(SA*)&(ca->sa),&cl));
ca->ucix=UCIX++;
ca->ss=cs;
#ifdef Win32
ca->n = nc;
#endif
} else if (selRet > 0 && FD_ISSET(cs,&readfds)) {

printf("in cancel\n");
if (localonly && !localSocketName) {
char **laddr=allowed_ips;
int allowed=0;
if (!laddr) {
allowed_ips = (char**)malloc(sizeof(char*) * 2);
if (allowed_ips != 0)
{
allowed_ips[0] = _strdup("127.0.0.1");
allowed_ips[1] = 0;
laddr = allowed_ips;
}
}
if (laddr != 0) {
while (*laddr) if (ca->sa.sin_addr.s_addr == inet_addr(*(laddr++))) { allowed = 1; break; };
connfd = accept(cs, (struct sockaddr*)NULL, NULL);
int allowed = getIpAddress(connfd);
// int allowed=0;

if (allowed) {
connfd = accept(cs, (struct sockaddr*)NULL, NULL);
// connfd = accept(cs, (struct sockaddr*)NULL, NULL);
#ifdef unix
startThread(connfd);
closesocket(ca->s);
free(ca);
#else
startWinThread(connfd);
#endif
continue;
}
else
closesocket(ca->s);
}
} else {


} else { // remote enabled
#ifdef RSERV_DEBUG
printf(" just before cancel/ping\n");
#endif
connfd = accept(cs, (struct sockaddr*)NULL, NULL);
#ifdef unix
startThread(connfd);
closesocket(ca->s);
free(ca);
#else
startWinThread(connfd);
#endif
continue;
}
}
}

#endif
#ifdef Win32
} else if (selRet > 0 && FD_ISSET(cs,&readfds)) {
connfd = (int)accept(cs, (struct sockaddr*)NULL, NULL);
startWinThread(connfd);
continue;
}
#endif
#ifdef Win32
wait:
// Check a child processes that has exited, close his socket
// descriptor and flag him as available for use.
for(int jj=0;jj<MAX_CLIENTS;++jj){
w = WaitForSingleObject(winPI[jj].hProcess, 0);
if(w > -1 && w < MAX_CLIENTS) {
closesocket(winSocks[w]);
winSocks[w] = INVALID_SOCKET;
CloseHandle((HANDLE *) &winPI[jj]);
}
}
#endif

} // end of while(active)
}



extern int Rf_initEmbeddedR(int, char**);

/* main function - start Rserve */
Expand Down Expand Up @@ -3521,7 +3507,8 @@ int main(int argc, char **argv)
for (i = 0; i < MAX_CLIENTS; ++i)
{
winSocks[i] = INVALID_SOCKET;
}
winPI[i].hProcess = 0;
}

if (SetConsoleCtrlHandler(
(PHANDLER_ROUTINE)ConsoleHandler,TRUE)==FALSE)
Expand Down

0 comments on commit 4f6df66

Please sign in to comment.