From a09dc101d479c414f1a6db6a76dfb416f4ab9478 Mon Sep 17 00:00:00 2001 From: z3APA3A <3APA3A@3proxy.ru> Date: Fri, 4 Dec 2020 19:31:25 +0300 Subject: [PATCH] Add control socket monitoring in polling --- src/auth.c | 6 ++-- src/dnspr.c | 8 ++--- src/ftppr.c | 3 ++ src/proxy.c | 49 ++++++++++++------------------ src/proxy.h | 4 +-- src/proxymain.c | 4 +-- src/sockgetchar.c | 77 ++++++++++++++++++++++++----------------------- src/sockmap.c | 22 +++++++++----- src/socks.c | 6 ++-- src/structures.h | 9 ++++-- src/udppm.c | 2 +- 11 files changed, 99 insertions(+), 91 deletions(-) diff --git a/src/auth.c b/src/auth.c index f795ad2..a297a05 100644 --- a/src/auth.c +++ b/src/auth.c @@ -1266,13 +1266,13 @@ unsigned long udpresolve(int af, char * name, char * value, unsigned *retttl, st len+=2; } - if(socksendto(sock, (struct sockaddr *)sinsr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000) != len){ + if(socksendto(sock, (struct sockaddr *)sinsr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000, param?param->monitorsock:NULL, param?param->monaction:0) != len){ so._shutdown(sock, SHUT_RDWR); so._closesocket(sock); continue; } if(param) param->statscli64 += len; - len = sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf, 4096, conf.timeouts[DNS_TO]*1000); + len = sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf, 4096, conf.timeouts[DNS_TO]*1000, param?param->monitorsock:NULL, param?param->monaction:0); so._shutdown(sock, SHUT_RDWR); so._closesocket(sock); if(len <= 13) { @@ -1284,7 +1284,7 @@ unsigned long udpresolve(int af, char * name, char * value, unsigned *retttl, st us = ntohs(*(unsigned short*)buf); len-=2; buf+=2; - if(us > 4096 || us < len || (us > len && sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf+len, us-len, conf.timeouts[DNS_TO]*1000) != us-len)) { + if(us > 4096 || us < len || (us > len && sockrecvfrom(sock, (struct sockaddr *)sinsr, (char *)buf+len, us-len, conf.timeouts[DNS_TO]*1000, param?param->monitorsock:NULL, param?param->monaction:0) != us-len)) { continue; } } diff --git a/src/dnspr.c b/src/dnspr.c index a8a37e1..6a7ef07 100644 --- a/src/dnspr.c +++ b/src/dnspr.c @@ -153,12 +153,12 @@ void * dnsprchild(struct clientparam* param) { #endif } - if(socksendto(param->remsock, (struct sockaddr *)¶m->sinsr, (char *)buf, i, conf.timeouts[SINGLEBYTE_L]*1000) != i){ + if(socksendto(param->remsock, (struct sockaddr *)¶m->sinsr, (char *)buf, i, conf.timeouts[SINGLEBYTE_L]*1000, NULL, 0) != i){ RETURN(820); } param->statscli64 += i; param->nwrites++; - len = sockrecvfrom(param->remsock, (struct sockaddr *)¶m->sinsr, (char *)buf, BUFSIZE, conf.timeouts[DNS_TO]*1000); + len = sockrecvfrom(param->remsock, (struct sockaddr *)¶m->sinsr, (char *)buf, BUFSIZE, conf.timeouts[DNS_TO]*1000, NULL, 0); if(len <= 13) { RETURN(821); } @@ -174,7 +174,7 @@ void * dnsprchild(struct clientparam* param) { if(len != us) RETURN(832); } if(buf[6] || buf[7]){ - if(socksendto(param->clisock, (struct sockaddr *)¶m->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000) != len){ + if(socksendto(param->clisock, (struct sockaddr *)¶m->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000, NULL, 0) != len){ RETURN(822); } RETURN(0); @@ -185,7 +185,7 @@ void * dnsprchild(struct clientparam* param) { buf[2] = 0x85; buf[3] = 0x83; } - res = socksendto(param->clisock, (struct sockaddr *)¶m->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000); + res = socksendto(param->clisock, (struct sockaddr *)¶m->sincr, (char *)buf, len, conf.timeouts[SINGLEBYTE_L]*1000, NULL, 0); if(res != len){RETURN(819);} if(!ip) {RETURN(888);} diff --git a/src/ftppr.c b/src/ftppr.c index 0b92e43..7a804f1 100644 --- a/src/ftppr.c +++ b/src/ftppr.c @@ -250,7 +250,10 @@ void * ftpprchild(struct clientparam* param) { so._setsockopt(param->remsock, SOL_SOCKET, SO_LINGER, (char *)&lg, sizeof(lg)); so._setsockopt(clidatasock, SOL_SOCKET, SO_LINGER, (char *)&lg, sizeof(lg)); param->clisock = clidatasock; + param->monitorsock = ¶m->ctrlsock; + param->monaction = 0; res = mapsocket(param, conf.timeouts[CONNECTION_S]); + param->monitorsock = NULL; if(param->remsock != INVALID_SOCKET) { so._shutdown (param->remsock, SHUT_RDWR); so._closesocket(param->remsock); diff --git a/src/proxy.c b/src/proxy.c index a89edd6..d16eadc 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -225,6 +225,7 @@ void * proxychild(struct clientparam* param) { SOCKET ftps; char ftpbuf[FTPBUFSIZE]; int inftpbuf = 0; + int haveconnection = 0; #ifndef WITHMAIN FILTER_ACTION action; #endif @@ -232,6 +233,7 @@ void * proxychild(struct clientparam* param) { + if(param->remsock != INVALID_SOCKET) haveconnection = 1; if(!(buf = myalloc(BUFSIZE))) {RETURN(21);} bufsize = BUFSIZE; anonymous = param->srv->anonymous; @@ -240,34 +242,13 @@ for(;;){ inbuf = 0; - if(keepalive && (param->cliinbuf == param->clioffset) && (param->remsock != INVALID_SOCKET)){ - memset(fds, 0, sizeof(fds)); - fds[0].fd = param->clisock; - fds[0].events = POLLIN; - fds[1].fd = param->remsock; - fds[1].events = POLLIN; - res = so._poll(fds, 2, conf.timeouts[STRING_S]*1000); - if(res<=0) { - RETURN(555); - } - if((fds[1].revents & (POLLIN|POLLHUP|POLLERR|POLLNVAL))) { - if(param->transparent || (!param->redirected && param->redirtype == R_HTTP)) RETURN(555); - ckeepalive = 0; - so._shutdown(param->remsock, SHUT_RDWR); - so._closesocket(param->remsock); - param->remsock = INVALID_SOCKET; - param->redirected = 0; - param->redirtype = 0; - memset(¶m->sinsl, 0, sizeof(param->sinsl)); - memset(¶m->sinsr, 0, sizeof(param->sinsr)); - memset(¶m->req, 0, sizeof(param->req)); - } - } - + if(param->remsock != INVALID_SOCKET)param->monitorsock = ¶m->remsock; + param->monaction = haveconnection? 0 : INVALID_SOCKET; i = sockgetlinebuf(param, CLIENT, buf, LINESIZE - 1, '\n', conf.timeouts[STRING_L]); if(i<=0) { RETURN((keepalive)?555:(i)?507:508); } + param->monaction = INVALID_SOCKET; if (i==2 && buf[0]=='\r' && buf[1]=='\n') continue; buf[i] = 0; @@ -279,11 +260,6 @@ for(;;){ so._closesocket(param->remsock); } param->remsock = INVALID_SOCKET; - param->redirected = 0; - param->redirtype = 0; - memset(¶m->sinsl, 0, sizeof(param->sinsl)); - memset(¶m->sinsr, 0, sizeof(param->sinsr)); - memset(¶m->req, 0, sizeof(param->req)); } myfree(req); } @@ -349,6 +325,7 @@ for(;;){ else param->operation = HTTP_OTHER; do { buf[inbuf+i]=0; + /*printf("Got: %s\n", buf+inbuf);*/ #ifndef WITHMAIN if(i > 25 && !param->srv->transparent && (!strncasecmp((char *)(buf+inbuf), "proxy-authorization", 19))){ @@ -471,6 +448,8 @@ for(;;){ reqsize = (int)strlen((char *)req); reqbufsize = reqsize + 1; + + #ifndef WITHMAIN action = handlereqfilters(param, &req, &reqbufsize, 0, &reqsize); @@ -511,6 +490,18 @@ for(;;){ #endif + + param->monitorsock = NULL; + if(param->remsock == INVALID_SOCKET){ + if(haveconnection) RETURN(555); + param->redirected = 0; + param->redirtype = 0; + memset(¶m->sinsl, 0, sizeof(param->sinsl)); + memset(¶m->sinsr, 0, sizeof(param->sinsr)); + memset(¶m->req, 0, sizeof(param->req)); + } + + if(param->srv->needuser > 1 && !param->username) {RETURN(4);} if((res = (*param->srv->authfunc)(param))) {RETURN(res);} diff --git a/src/proxy.h b/src/proxy.h index ccb7f91..01018e0 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -154,8 +154,8 @@ extern struct extparam conf; int sockmap(struct clientparam * param, int timeo, int usesplice); int socksend(SOCKET sock, char * buf, int bufsize, int tosec); -int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec); -int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec); +int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec, SOCKET* monsock, int monaction); +int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int tomsec, SOCKET* monsock, int monaction); int sockgetcharcli(struct clientparam * param, int timeosec, int timeousec); diff --git a/src/proxymain.c b/src/proxymain.c index 2c3323c..ad38f8d 100644 --- a/src/proxymain.c +++ b/src/proxymain.c @@ -697,7 +697,7 @@ int MODULEMAINFUNC (int argc, char** argv){ continue; } - if(sockrecvfrom(new_sock,(struct sockaddr*)&defparam.sincr,buf,1,60*1000) != 1 || *buf!='C') { + if(sockrecvfrom(new_sock,(struct sockaddr*)&defparam.sincr,buf,1,60*1000,NULL,0) != 1 || *buf!='C') { so._closesocket(new_sock); new_sock = INVALID_SOCKET; usleep(SLEEPTIME); @@ -891,7 +891,7 @@ void srvinit(struct srvparam * srv, struct clientparam *param){ param->srv = srv; param->version = srv->version; param->paused = srv->paused; - param->remsock = param->clisock = param->ctrlsock = param->ctrlsocksrv = param->monitorsock = INVALID_SOCKET; + param->remsock = param->clisock = param->ctrlsock = param->ctrlsocksrv = INVALID_SOCKET; *SAFAMILY(¶m->req) = *SAFAMILY(¶m->sinsl) = *SAFAMILY(¶m->sinsr) = *SAFAMILY(¶m->sincr) = *SAFAMILY(¶m->sincl) = AF_INET; pthread_mutex_init(&srv->counter_mutex, NULL); srv->intsa = conf.intsa; diff --git a/src/sockgetchar.c b/src/sockgetchar.c index 7a22667..dce6f5e 100644 --- a/src/sockgetchar.c +++ b/src/sockgetchar.c @@ -7,45 +7,32 @@ #include "proxy.h" -int socksend(SOCKET sock, char * buf, int bufsize, int to){ +int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET *monitorsock, int monaction){ int sent = 0; int res; - struct pollfd fds={0}; + struct pollfd fds[2]={{0},{0}}; - fds.fd = sock; - do { - res = so._send(sock, (char *)buf + sent, bufsize - sent, 0); - if(res <= 0) { - if(errno != EAGAIN && errno != EINTR) break; - fds.events = POLLOUT; - fds.fd = sock; - if(conf.timetoexit) return sent; - res = so._poll(&fds, 1, to*1000); - if(res < 1 && errno != EAGAIN && errno != EINTR) break; - continue; - } - sent += res; - } while (sent < bufsize); - return sent; -} - - -int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to){ - int sent = 0; - int res; - struct pollfd fds={0}; - - fds.fd = sock; + fds[0].fd = sock; + if(monitorsock)fds[1].fd = *monitorsock; do { if(conf.timetoexit) return 0; - res = so._sendto(sock, (char *)buf + sent, bufsize - sent, 0, sin, SASIZE(sin)); + res = sin?so._sendto(sock, (char *)buf + sent, bufsize - sent, 0, sin, SASIZE(sin)):so._send(sock, (char *)buf + sent, bufsize - sent, 0); if(res < 0) { if(errno != EAGAIN && errno != EINTR) break; - fds.events = POLLOUT; + fds[0].events = POLLOUT; + fds[1].events = POLLIN; if(conf.timetoexit) return sent; - res = so._poll(&fds, 1, to); + res = so._poll(fds, monitorsock?2:1, to); if(res < 0 && (errno == EAGAIN || errno == EINTR)) continue; if(res < 1) break; + if(monitorsock && fds[1].revents){ + if(monaction == INVALID_SOCKET){ + so._closesocket(*monitorsock); + *monitorsock = INVALID_SOCKET; + monitorsock = NULL; + } + else return monaction; + } res = 0; } sent += res; @@ -53,22 +40,36 @@ int socksendto(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int return sent; } -int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to){ - struct pollfd fds={0}; +int sockrecvfrom(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET *monitorsock, int monaction){ + struct pollfd fds[2]={{0},{0}}; SASIZETYPE sasize; int res; - fds.fd = sock; + fds[0].fd = sock; + if(monitorsock)fds[1].fd = *monitorsock; do { + if(monitorsock && fds[1].revents){ + if(monaction == INVALID_SOCKET){ + so._closesocket(*monitorsock); + *monitorsock = INVALID_SOCKET; + monitorsock = NULL; + } + else return monaction; + } sasize = SASIZE(sin); res = so._recvfrom(sock, (char *)buf, bufsize, 0, (struct sockaddr *)sin, &sasize); if ((res >= 0) || (errno != EAGAIN && errno != EINTR) || conf.timetoexit) break; - fds.events = POLLIN; - res = so._poll(&fds, 1, to); + fds[0].events = POLLIN; + fds[0].events = POLLIN; + res = so._poll(fds, monitorsock?2:1, to); } while (res == 1 || (res < 0 && (errno == EAGAIN || errno == EINTR))); return res; } +int socksend(SOCKET sock, char * buf, int bufsize, int to){ + return socksendto(sock, NULL, buf, bufsize, to*1000, NULL, 0); +} + int sockgetcharcli(struct clientparam * param, int timeosec, int timeousec){ int len; @@ -81,7 +82,7 @@ int sockgetcharcli(struct clientparam * param, int timeosec, int timeousec){ return (int)param->clibuf[param->clioffset++]; } param->clioffset = param->cliinbuf = 0; - if ((len = sockrecvfrom(param->clisock, (struct sockaddr *)¶m->sincr, param->clibuf, param->clibufsize, timeosec*1000 + timeousec))<=0) return EOF; + if ((len = sockrecvfrom(param->clisock, (struct sockaddr *)¶m->sincr, param->clibuf, param->clibufsize, timeosec*1000 + timeousec, param->monitorsock, param->monaction))<=0) return EOF; param->cliinbuf = len; param->clioffset = 1; return (int)*param->clibuf; @@ -101,7 +102,7 @@ int sockfillbuffcli(struct clientparam * param, unsigned long size, int timeosec } if(size <= param->cliinbuf) return size; size -= param->cliinbuf; - if((len = sockrecvfrom(param->clisock, (struct sockaddr *)¶m->sincr, param->clibuf + param->cliinbuf, (param->clibufsize - param->cliinbuf) < size? param->clibufsize - param->cliinbuf:size, timeosec*1000)) > 0){ + if((len = sockrecvfrom(param->clisock, (struct sockaddr *)¶m->sincr, param->clibuf + param->cliinbuf, (param->clibufsize - param->cliinbuf) < size? param->clibufsize - param->cliinbuf:size, timeosec*1000, param->monitorsock, param->monaction)) > 0){ param->cliinbuf += len; } return param->cliinbuf; @@ -121,7 +122,7 @@ int sockfillbuffsrv(struct clientparam * param, unsigned long size, int timeosec } if(size <= param->srvinbuf) return size; size -= param->srvinbuf; - if((len = sockrecvfrom(param->remsock, (struct sockaddr *)¶m->sinsr, param->srvbuf + param->srvinbuf, (param->srvbufsize - param->srvinbuf) < size? param->srvbufsize - param->srvinbuf:size, timeosec*1000)) > 0){ + if((len = sockrecvfrom(param->remsock, (struct sockaddr *)¶m->sinsr, param->srvbuf + param->srvinbuf, (param->srvbufsize - param->srvinbuf) < size? param->srvbufsize - param->srvinbuf:size, timeosec*1000, param->monitorsock, param->monaction)) > 0){ param->srvinbuf += len; param->nreads++; param->statssrv64 += len; @@ -146,7 +147,7 @@ int sockgetcharsrv(struct clientparam * param, int timeosec, int timeousec){ return (int)param->srvbuf[param->srvoffset++]; } param->srvoffset = param->srvinbuf = 0; - if ((len = sockrecvfrom(param->remsock, (struct sockaddr *)¶m->sinsr, param->srvbuf, param->srvbufsize, timeosec*1000 + timeousec))<=0) return EOF; + if ((len = sockrecvfrom(param->remsock, (struct sockaddr *)¶m->sinsr, param->srvbuf, param->srvbufsize, timeosec*1000 + timeousec, param->monitorsock, param->monaction))<=0) return EOF; param->srvinbuf = len; param->srvoffset = 1; param->nreads++; diff --git a/src/sockmap.c b/src/sockmap.c index ddb0068..eb56789 100644 --- a/src/sockmap.c +++ b/src/sockmap.c @@ -167,11 +167,12 @@ log(logbuf); memset(fds, 0, sizeof(fds)); fds[0].fd = param->clisock; fds[1].fd = param->remsock; - fds[2].fd = param->monitorsock; + if(param->monitorsock)fds[2].fd = *param->monitorsock; fds[2].events = POLLIN; - if(param->monitorsock != INVALID_SOCKET) - so._poll(fds, param->monitorsock != INVALID_SOCKET? 3:2, sleeptime); - if(fds[2].revents)RETURN (93); + so._poll(fds, param->monitorsock? 3:2, sleeptime); + if(fds[2].revents){ + RETURN (93); + } sleeptime = 0; } if((param->srv->logdumpsrv && (param->statssrv64 > param->srv->logdumpsrv)) || @@ -440,13 +441,20 @@ log("done read from server to buf"); if(!after){ memset(fds, 0, sizeof(fds)); } - if(param->monitorsock != INVALID_SOCKET){ + if(param->monitorsock){ if(!after){ - fds[fdsc].fd = param->monitorsock; + fds[fdsc].fd = *param->monitorsock; fds[fdsc].events = POLLIN; fdsc++; } - else if(fds[fdsc].revents) RETURN(90); + else if(fds[fdsc].revents) { + if(param->monaction == INVALID_SOCKET){ + so._closesocket(*param->monitorsock); + *param->monitorsock = INVALID_SOCKET; + param->monitorsock = NULL; + } + else RETURN(param->monaction); + } } if(!CLIENTTERM){ if(!after){ diff --git a/src/socks.c b/src/socks.c index dd1a734..f25b555 100644 --- a/src/socks.c +++ b/src/socks.c @@ -398,7 +398,7 @@ fflush(stderr); sasize = sizeof(param->sinsr); if(len > (int)i){ - socksendto(param->remsock, (struct sockaddr *)¶m->sinsr, buf+i, len - i, conf.timeouts[SINGLEBYTE_L]*1000); + socksendto(param->remsock, (struct sockaddr *)¶m->sinsr, buf+i, len - i, conf.timeouts[SINGLEBYTE_L]*1000, ¶m->ctrlsock, INVALID_SOCKET); param->statscli64+=(len - i); param->nwrites++; #if SOCKSTRACE > 1 @@ -415,6 +415,7 @@ fprintf(stderr, "client address is assumed to be %s:%hu\n", fflush(stderr); #endif } + if(param->ctrlsock == INVALID_SOCKET) break; continue; } else if(len ==0 || (len <0 && errno != EINTR && errno != EAGAIN)){ @@ -446,7 +447,7 @@ fflush(stderr); memcpy(buf+4, SAADDR(¶m->sinsr), SAADDRLEN(¶m->sinsr)); memcpy(buf+4+SAADDRLEN(¶m->sinsr), SAPORT(¶m->sinsr), 2); sasize = sizeof(sin); - socksendto(param->clisock, (struct sockaddr *)&sin, buf, len + 6 + SAADDRLEN(¶m->sinsr), conf.timeouts[SINGLEBYTE_L]*1000); + socksendto(param->clisock, (struct sockaddr *)&sin, buf, len + 6 + SAADDRLEN(¶m->sinsr), conf.timeouts[SINGLEBYTE_L]*1000, ¶m->ctrlsock, INVALID_SOCKET); #if SOCKSTRACE > 1 fprintf(stderr, "UDP packet relayed to client from %hu size %d\n", ntohs(*SAPORT(¶m->sinsr)), @@ -455,6 +456,7 @@ fprintf(stderr, "UDP packet relayed to client from %hu size %d\n", fflush(stderr); #endif + if(param->ctrlsock == INVALID_SOCKET) break; continue; } fds[0].events = POLLIN; diff --git a/src/structures.h b/src/structures.h index 83d7dda..18a1338 100644 --- a/src/structures.h +++ b/src/structures.h @@ -528,8 +528,11 @@ struct clientparam { SOCKET clisock, remsock, ctrlsock, - ctrlsocksrv, monitorsock; + ctrlsocksrv; + SOCKET * monitorsock; + + int monaction; PROXYSERVICE service; REDIRTYPE redirtype; @@ -765,8 +768,8 @@ struct pluginlink { struct commands * commandhandlers; void * (*findbyname)(const char *name); int (*socksend)(SOCKET sock, char * buf, int bufsize, int to); - int (*socksendto)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to); - int (*sockrecvfrom)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to); + int (*socksendto)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET monsock, int monaction); + int (*sockrecvfrom)(SOCKET sock, struct sockaddr * sin, char * buf, int bufsize, int to, SOCKET monsock, int monaction); int (*sockgetcharcli)(struct clientparam * param, int timeosec, int timeousec); int (*sockgetcharsrv)(struct clientparam * param, int timeosec, int timeousec); int (*sockgetlinebuf)(struct clientparam * param, DIRECTION which, char * buf, int bufsize, int delim, int to); diff --git a/src/udppm.c b/src/udppm.c index 842cc59..3ef9167 100644 --- a/src/udppm.c +++ b/src/udppm.c @@ -48,7 +48,7 @@ void * udppmchild(struct clientparam* param) { RETURN (21); } param->cliinbuf = param->clioffset = 0; - i = sockrecvfrom(param->srv->srvsock, (struct sockaddr *)¶m->sincr, param->clibuf, param->clibufsize, 0); + i = sockrecvfrom(param->srv->srvsock, (struct sockaddr *)¶m->sincr, param->clibuf, param->clibufsize, 0, NULL, 0); if(i<=0){ param->srv->fds.events = POLLIN; RETURN (214);