Add control socket monitoring in polling

This commit is contained in:
z3APA3A 2020-12-04 19:31:25 +03:00
parent fa74b9a6e3
commit a09dc101d4
11 changed files with 99 additions and 91 deletions

View File

@ -1266,13 +1266,13 @@ unsigned long udpresolve(int af, char * name, char * value, unsigned *retttl, st
len+=2;
}
if(socksendto(sock, (struct sockaddr *)sinsr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000) != len){
if(socksendto(sock, (struct sockaddr *)sinsr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000, param?param->monitorsock:NULL, param?param->monaction:0) != len){
so._shutdown(sock, SHUT_RDWR);
so._closesocket(sock);
continue;
}
if(param) param->statscli64 += len;
len = sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf, 4096, conf.timeouts[DNS_TO]*1000);
len = sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf, 4096, conf.timeouts[DNS_TO]*1000, param?param->monitorsock:NULL, param?param->monaction:0);
so._shutdown(sock, SHUT_RDWR);
so._closesocket(sock);
if(len <= 13) {
@ -1284,7 +1284,7 @@ unsigned long udpresolve(int af, char * name, char * value, unsigned *retttl, st
us = ntohs(*(unsigned short*)buf);
len-=2;
buf+=2;
if(us > 4096 || us < len || (us > len && sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf+len, us-len, conf.timeouts[DNS_TO]*1000) != us-len)) {
if(us > 4096 || us < len || (us > len && sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf+len, us-len, conf.timeouts[DNS_TO]*1000, param?param->monitorsock:NULL, param?param->monaction:0) != us-len)) {
continue;
}
}

View File

@ -153,12 +153,12 @@ void * dnsprchild(struct clientparam* param) {
#endif
}
if(socksendto(param->remsock, (struct sockaddr *)&param->sinsr, (char *)buf, i, conf.timeouts[SINGLEBYTE_L]*1000) != i){
if(socksendto(param->remsock, (struct sockaddr *)&param->sinsr, (char *)buf, i, conf.timeouts[SINGLEBYTE_L]*1000, NULL, 0) != i){
RETURN(820);
}
param->statscli64 += i;
param->nwrites++;
len = sockrecvfrom(param->remsock, (struct sockaddr *)&param->sinsr, (char *)buf, BUFSIZE, conf.timeouts[DNS_TO]*1000);
len = sockrecvfrom(param->remsock, (struct sockaddr *)&param->sinsr, (char *)buf, BUFSIZE, conf.timeouts[DNS_TO]*1000, NULL, 0);
if(len <= 13) {
RETURN(821);
}
@ -174,7 +174,7 @@ void * dnsprchild(struct clientparam* param) {
if(len != us) RETURN(832);
}
if(buf[6] || buf[7]){
if(socksendto(param->clisock, (struct sockaddr *)&param->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000) != len){
if(socksendto(param->clisock, (struct sockaddr *)&param->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000, NULL, 0) != len){
RETURN(822);
}
RETURN(0);
@ -185,7 +185,7 @@ void * dnsprchild(struct clientparam* param) {
buf[2] = 0x85;
buf[3] = 0x83;
}
res = socksendto(param->clisock, (struct sockaddr *)&param->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000);
res = socksendto(param->clisock, (struct sockaddr *)&param->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000, NULL, 0);
if(res != len){RETURN(819);}
if(!ip) {RETURN(888);}

View File

@ -250,7 +250,10 @@ void * ftpprchild(struct clientparam* param) {
so._setsockopt(param->remsock, SOL_SOCKET, SO_LINGER, (char *)&lg, sizeof(lg));
so._setsockopt(clidatasock, SOL_SOCKET, SO_LINGER, (char *)&lg, sizeof(lg));
param->clisock = clidatasock;
param->monitorsock = &param->ctrlsock;
param->monaction = 0;
res = mapsocket(param, conf.timeouts[CONNECTION_S]);
param->monitorsock = NULL;
if(param->remsock != INVALID_SOCKET) {
so._shutdown (param->remsock, SHUT_RDWR);
so._closesocket(param->remsock);

View File

@ -225,6 +225,7 @@ void * proxychild(struct clientparam* param) {
SOCKET ftps;
char ftpbuf[FTPBUFSIZE];
int inftpbuf = 0;
int haveconnection = 0;
#ifndef WITHMAIN
FILTER_ACTION action;
#endif
@ -232,6 +233,7 @@ void * proxychild(struct clientparam* param) {
if(param->remsock != INVALID_SOCKET) haveconnection = 1;
if(!(buf = myalloc(BUFSIZE))) {RETURN(21);}
bufsize = BUFSIZE;
anonymous = param->srv->anonymous;
@ -240,34 +242,13 @@ for(;;){
inbuf = 0;
if(keepalive && (param->cliinbuf == param->clioffset) && (param->remsock != INVALID_SOCKET)){
memset(fds, 0, sizeof(fds));
fds[0].fd = param->clisock;
fds[0].events = POLLIN;
fds[1].fd = param->remsock;
fds[1].events = POLLIN;
res = so._poll(fds, 2, conf.timeouts[STRING_S]*1000);
if(res<=0) {
RETURN(555);
}
if((fds[1].revents & (POLLIN|POLLHUP|POLLERR|POLLNVAL))) {
if(param->transparent || (!param->redirected && param->redirtype == R_HTTP)) RETURN(555);
ckeepalive = 0;
so._shutdown(param->remsock, SHUT_RDWR);
so._closesocket(param->remsock);
param->remsock = INVALID_SOCKET;
param->redirected = 0;
param->redirtype = 0;
memset(&param->sinsl, 0, sizeof(param->sinsl));
memset(&param->sinsr, 0, sizeof(param->sinsr));
memset(&param->req, 0, sizeof(param->req));
}
}
if(param->remsock != INVALID_SOCKET)param->monitorsock = &param->remsock;
param->monaction = haveconnection? 0 : INVALID_SOCKET;
i = sockgetlinebuf(param, CLIENT, buf, LINESIZE - 1, '\n', conf.timeouts[STRING_L]);
if(i<=0) {
RETURN((keepalive)?555:(i)?507:508);
}
param->monaction = INVALID_SOCKET;
if (i==2 && buf[0]=='\r' && buf[1]=='\n') continue;
buf[i] = 0;
@ -279,11 +260,6 @@ for(;;){
so._closesocket(param->remsock);
}
param->remsock = INVALID_SOCKET;
param->redirected = 0;
param->redirtype = 0;
memset(&param->sinsl, 0, sizeof(param->sinsl));
memset(&param->sinsr, 0, sizeof(param->sinsr));
memset(&param->req, 0, sizeof(param->req));
}
myfree(req);
}
@ -349,6 +325,7 @@ for(;;){
else param->operation = HTTP_OTHER;
do {
buf[inbuf+i]=0;
/*printf("Got: %s\n", buf+inbuf);*/
#ifndef WITHMAIN
if(i > 25 && !param->srv->transparent && (!strncasecmp((char *)(buf+inbuf), "proxy-authorization", 19))){
@ -471,6 +448,8 @@ for(;;){
reqsize = (int)strlen((char *)req);
reqbufsize = reqsize + 1;
#ifndef WITHMAIN
action = handlereqfilters(param, &req, &reqbufsize, 0, &reqsize);
@ -511,6 +490,18 @@ for(;;){
#endif
param->monitorsock = NULL;
if(param->remsock == INVALID_SOCKET){
if(haveconnection) RETURN(555);
param->redirected = 0;
param->redirtype = 0;
memset(&param->sinsl, 0, sizeof(param->sinsl));
memset(&param->sinsr, 0, sizeof(param->sinsr));
memset(&param->req, 0, sizeof(param->req));
}
if(param->srv->needuser > 1 && !param->username) {RETURN(4);}
if((res = (*param->srv->authfunc)(param))) {RETURN(res);}

View File

@ -154,8 +154,8 @@ extern struct extparam conf;
int sockmap(struct clientparam * param, int timeo, int usesplice);
int socksend(SOCKET sock, char * buf, int bufsize, int tosec);
int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec);
int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec);
int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec, SOCKET* monsock, int monaction);
int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec, SOCKET* monsock, int monaction);
int sockgetcharcli(struct clientparam * param, int timeosec, int timeousec);

View File

@ -697,7 +697,7 @@ int MODULEMAINFUNC (int argc, char** argv){
continue;
}
if(sockrecvfrom(new_sock,(struct sockaddr*)&defparam.sincr,buf,1,60*1000) != 1 || *buf!='C') {
if(sockrecvfrom(new_sock,(struct sockaddr*)&defparam.sincr,buf,1,60*1000,NULL,0) != 1 || *buf!='C') {
so._closesocket(new_sock);
new_sock = INVALID_SOCKET;
usleep(SLEEPTIME);
@ -891,7 +891,7 @@ void srvinit(struct srvparam * srv, struct clientparam *param){
param->srv = srv;
param->version = srv->version;
param->paused = srv->paused;
param->remsock = param->clisock = param->ctrlsock = param->ctrlsocksrv = param->monitorsock = INVALID_SOCKET;
param->remsock = param->clisock = param->ctrlsock = param->ctrlsocksrv = INVALID_SOCKET;
*SAFAMILY(&param->req) = *SAFAMILY(&param->sinsl) = *SAFAMILY(&param->sinsr) = *SAFAMILY(&param->sincr) = *SAFAMILY(&param->sincl) = AF_INET;
pthread_mutex_init(&srv->counter_mutex, NULL);
srv->intsa = conf.intsa;

View File

@ -7,45 +7,32 @@
#include "proxy.h"
int socksend(SOCKET sock, char * buf, int bufsize, int to){
int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET *monitorsock, int monaction){
int sent = 0;
int res;
struct pollfd fds={0};
struct pollfd fds[2]={{0},{0}};
fds.fd = sock;
do {
res = so._send(sock, (char *)buf + sent, bufsize - sent, 0);
if(res <= 0) {
if(errno != EAGAIN && errno != EINTR) break;
fds.events = POLLOUT;
fds.fd = sock;
if(conf.timetoexit) return sent;
res = so._poll(&fds, 1, to*1000);
if(res < 1 && errno != EAGAIN && errno != EINTR) break;
continue;
}
sent += res;
} while (sent < bufsize);
return sent;
}
int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to){
int sent = 0;
int res;
struct pollfd fds={0};
fds.fd = sock;
fds[0].fd = sock;
if(monitorsock)fds[1].fd = *monitorsock;
do {
if(conf.timetoexit) return 0;
res = so._sendto(sock, (char *)buf + sent, bufsize - sent, 0, sin, SASIZE(sin));
res = sin?so._sendto(sock, (char *)buf + sent, bufsize - sent, 0, sin, SASIZE(sin)):so._send(sock, (char *)buf + sent, bufsize - sent, 0);
if(res < 0) {
if(errno != EAGAIN && errno != EINTR) break;
fds.events = POLLOUT;
fds[0].events = POLLOUT;
fds[1].events = POLLIN;
if(conf.timetoexit) return sent;
res = so._poll(&fds, 1, to);
res = so._poll(fds, monitorsock?2:1, to);
if(res < 0 && (errno == EAGAIN || errno == EINTR)) continue;
if(res < 1) break;
if(monitorsock && fds[1].revents){
if(monaction == INVALID_SOCKET){
so._closesocket(*monitorsock);
*monitorsock = INVALID_SOCKET;
monitorsock = NULL;
}
else return monaction;
}
res = 0;
}
sent += res;
@ -53,22 +40,36 @@ int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int
return sent;
}
int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to){
struct pollfd fds={0};
int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET *monitorsock, int monaction){
struct pollfd fds[2]={{0},{0}};
SASIZETYPE sasize;
int res;
fds.fd = sock;
fds[0].fd = sock;
if(monitorsock)fds[1].fd = *monitorsock;
do {
if(monitorsock && fds[1].revents){
if(monaction == INVALID_SOCKET){
so._closesocket(*monitorsock);
*monitorsock = INVALID_SOCKET;
monitorsock = NULL;
}
else return monaction;
}
sasize = SASIZE(sin);
res = so._recvfrom(sock, (char *)buf, bufsize, 0, (struct sockaddr *)sin, &sasize);
if ((res >= 0) || (errno != EAGAIN && errno != EINTR) || conf.timetoexit) break;
fds.events = POLLIN;
res = so._poll(&fds, 1, to);
fds[0].events = POLLIN;
fds[0].events = POLLIN;
res = so._poll(fds, monitorsock?2:1, to);
} while (res == 1 || (res < 0 && (errno == EAGAIN || errno == EINTR)));
return res;
}
int socksend(SOCKET sock, char * buf, int bufsize, int to){
return socksendto(sock, NULL, buf, bufsize, to*1000, NULL, 0);
}
int sockgetcharcli(struct clientparam * param, int timeosec, int timeousec){
int len;
@ -81,7 +82,7 @@ int sockgetcharcli(struct clientparam * param, int timeosec, int timeousec){
return (int)param->clibuf[param->clioffset++];
}
param->clioffset = param->cliinbuf = 0;
if ((len = sockrecvfrom(param->clisock, (struct sockaddr *)&param->sincr, param->clibuf, param->clibufsize, timeosec*1000 + timeousec))<=0) return EOF;
if ((len = sockrecvfrom(param->clisock, (struct sockaddr *)&param->sincr, param->clibuf, param->clibufsize, timeosec*1000 + timeousec, param->monitorsock, param->monaction))<=0) return EOF;
param->cliinbuf = len;
param->clioffset = 1;
return (int)*param->clibuf;
@ -101,7 +102,7 @@ int sockfillbuffcli(struct clientparam * param, unsigned long size, int timeosec
}
if(size <= param->cliinbuf) return size;
size -= param->cliinbuf;
if((len = sockrecvfrom(param->clisock, (struct sockaddr *)&param->sincr, param->clibuf + param->cliinbuf, (param->clibufsize - param->cliinbuf) < size? param->clibufsize - param->cliinbuf:size, timeosec*1000)) > 0){
if((len = sockrecvfrom(param->clisock, (struct sockaddr *)&param->sincr, param->clibuf + param->cliinbuf, (param->clibufsize - param->cliinbuf) < size? param->clibufsize - param->cliinbuf:size, timeosec*1000, param->monitorsock, param->monaction)) > 0){
param->cliinbuf += len;
}
return param->cliinbuf;
@ -121,7 +122,7 @@ int sockfillbuffsrv(struct clientparam * param, unsigned long size, int timeosec
}
if(size <= param->srvinbuf) return size;
size -= param->srvinbuf;
if((len = sockrecvfrom(param->remsock, (struct sockaddr *)&param->sinsr, param->srvbuf + param->srvinbuf, (param->srvbufsize - param->srvinbuf) < size? param->srvbufsize - param->srvinbuf:size, timeosec*1000)) > 0){
if((len = sockrecvfrom(param->remsock, (struct sockaddr *)&param->sinsr, param->srvbuf + param->srvinbuf, (param->srvbufsize - param->srvinbuf) < size? param->srvbufsize - param->srvinbuf:size, timeosec*1000, param->monitorsock, param->monaction)) > 0){
param->srvinbuf += len;
param->nreads++;
param->statssrv64 += len;
@ -146,7 +147,7 @@ int sockgetcharsrv(struct clientparam * param, int timeosec, int timeousec){
return (int)param->srvbuf[param->srvoffset++];
}
param->srvoffset = param->srvinbuf = 0;
if ((len = sockrecvfrom(param->remsock, (struct sockaddr *)&param->sinsr, param->srvbuf, param->srvbufsize, timeosec*1000 + timeousec))<=0) return EOF;
if ((len = sockrecvfrom(param->remsock, (struct sockaddr *)&param->sinsr, param->srvbuf, param->srvbufsize, timeosec*1000 + timeousec, param->monitorsock, param->monaction))<=0) return EOF;
param->srvinbuf = len;
param->srvoffset = 1;
param->nreads++;

View File

@ -167,11 +167,12 @@ log(logbuf);
memset(fds, 0, sizeof(fds));
fds[0].fd = param->clisock;
fds[1].fd = param->remsock;
fds[2].fd = param->monitorsock;
if(param->monitorsock)fds[2].fd = *param->monitorsock;
fds[2].events = POLLIN;
if(param->monitorsock != INVALID_SOCKET)
so._poll(fds, param->monitorsock != INVALID_SOCKET? 3:2, sleeptime);
if(fds[2].revents)RETURN (93);
so._poll(fds, param->monitorsock? 3:2, sleeptime);
if(fds[2].revents){
RETURN (93);
}
sleeptime = 0;
}
if((param->srv->logdumpsrv && (param->statssrv64 > param->srv->logdumpsrv)) ||
@ -440,13 +441,20 @@ log("done read from server to buf");
if(!after){
memset(fds, 0, sizeof(fds));
}
if(param->monitorsock != INVALID_SOCKET){
if(param->monitorsock){
if(!after){
fds[fdsc].fd = param->monitorsock;
fds[fdsc].fd = *param->monitorsock;
fds[fdsc].events = POLLIN;
fdsc++;
}
else if(fds[fdsc].revents) RETURN(90);
else if(fds[fdsc].revents) {
if(param->monaction == INVALID_SOCKET){
so._closesocket(*param->monitorsock);
*param->monitorsock = INVALID_SOCKET;
param->monitorsock = NULL;
}
else RETURN(param->monaction);
}
}
if(!CLIENTTERM){
if(!after){

View File

@ -398,7 +398,7 @@ fflush(stderr);
sasize = sizeof(param->sinsr);
if(len > (int)i){
socksendto(param->remsock, (struct sockaddr *)&param->sinsr, buf+i, len - i, conf.timeouts[SINGLEBYTE_L]*1000);
socksendto(param->remsock, (struct sockaddr *)&param->sinsr, buf+i, len - i, conf.timeouts[SINGLEBYTE_L]*1000, &param->ctrlsock, INVALID_SOCKET);
param->statscli64+=(len - i);
param->nwrites++;
#if SOCKSTRACE > 1
@ -415,6 +415,7 @@ fprintf(stderr, "client address is assumed to be %s:%hu\n",
fflush(stderr);
#endif
}
if(param->ctrlsock == INVALID_SOCKET) break;
continue;
}
else if(len ==0 || (len <0 && errno != EINTR && errno != EAGAIN)){
@ -446,7 +447,7 @@ fflush(stderr);
memcpy(buf+4, SAADDR(&param->sinsr), SAADDRLEN(&param->sinsr));
memcpy(buf+4+SAADDRLEN(&param->sinsr), SAPORT(&param->sinsr), 2);
sasize = sizeof(sin);
socksendto(param->clisock, (struct sockaddr *)&sin, buf, len + 6 + SAADDRLEN(&param->sinsr), conf.timeouts[SINGLEBYTE_L]*1000);
socksendto(param->clisock, (struct sockaddr *)&sin, buf, len + 6 + SAADDRLEN(&param->sinsr), conf.timeouts[SINGLEBYTE_L]*1000, &param->ctrlsock, INVALID_SOCKET);
#if SOCKSTRACE > 1
fprintf(stderr, "UDP packet relayed to client from %hu size %d\n",
ntohs(*SAPORT(&param->sinsr)),
@ -455,6 +456,7 @@ fprintf(stderr, "UDP packet relayed to client from %hu size %d\n",
fflush(stderr);
#endif
if(param->ctrlsock == INVALID_SOCKET) break;
continue;
}
fds[0].events = POLLIN;

View File

@ -528,8 +528,11 @@ struct clientparam {
SOCKET clisock,
remsock,
ctrlsock,
ctrlsocksrv, monitorsock;
ctrlsocksrv;
SOCKET * monitorsock;
int monaction;
PROXYSERVICE service;
REDIRTYPE redirtype;
@ -765,8 +768,8 @@ struct pluginlink {
struct commands * commandhandlers;
void * (*findbyname)(const char *name);
int (*socksend)(SOCKET sock, char * buf, int bufsize, int to);
int (*socksendto)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to);
int (*sockrecvfrom)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to);
int (*socksendto)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET monsock, int monaction);
int (*sockrecvfrom)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET monsock, int monaction);
int (*sockgetcharcli)(struct clientparam * param, int timeosec, int timeousec);
int (*sockgetcharsrv)(struct clientparam * param, int timeosec, int timeousec);
int (*sockgetlinebuf)(struct clientparam * param, DIRECTION which, char * buf, int bufsize, int delim, int to);

View File

@ -48,7 +48,7 @@ void * udppmchild(struct clientparam* param) {
RETURN (21);
}
param->cliinbuf = param->clioffset = 0;
i = sockrecvfrom(param->srv->srvsock, (struct sockaddr *)&param->sincr, param->clibuf, param->clibufsize, 0);
i = sockrecvfrom(param->srv->srvsock, (struct sockaddr *)&param->sincr, param->clibuf, param->clibufsize, 0, NULL, 0);
if(i<=0){
param->srv->fds.events = POLLIN;
RETURN (214);