fix server connection leak for list block command

This commit is contained in:
fortrue 2017-09-06 17:09:16 +08:00
parent 017ae6461b
commit 1569b57bd7
3 changed files with 10 additions and 31 deletions

View File

@ -107,24 +107,6 @@ void ConnectConnectionPool::putPrivateConnection(ConnectConnection* s)
} }
} }
void ConnectConnectionPool::putTransactionConnection(ConnectConnection* s, bool inWatch, bool inMulti)
{
if (s->good()) {
if (inMulti) {
RequestPtr req = RequestAlloc::create(Request::DiscardServ);
mHandler->handleRequest(req, s);
logDebug("h %d s %s %d discard req %ld",
mHandler->id(), s->peer(), s->fd(), req->id());
} else if (inWatch) {
RequestPtr req = RequestAlloc::create(Request::UnwatchServ);
mHandler->handleRequest(req, s);
logDebug("h %d s %s %d unwatch req %ld",
mHandler->id(), s->peer(), s->fd(), req->id());
}
}
putPrivateConnection(s);
}
bool ConnectConnectionPool::init(ConnectConnection* c) bool ConnectConnectionPool::init(ConnectConnection* c)
{ {
if (!c->setNonBlock()) { if (!c->setNonBlock()) {

View File

@ -24,7 +24,6 @@ public:
ConnectConnection* getShareConnection(int db=0); ConnectConnection* getShareConnection(int db=0);
ConnectConnection* getPrivateConnection(int db=0); ConnectConnection* getPrivateConnection(int db=0);
void putPrivateConnection(ConnectConnection* s); void putPrivateConnection(ConnectConnection* s);
void putTransactionConnection(ConnectConnection* s, bool inWatch, bool inMulti);
void check(); void check();
Server* server() const Server* server() const
{ {

View File

@ -180,15 +180,9 @@ void Handler::postAcceptConnectionEvent()
mEventLoop->delSocket(c); mEventLoop->delSocket(c);
if (auto s = c->connectConnection()) { if (auto s = c->connectConnection()) {
auto cp = mConnPool[s->server()->id()]; auto cp = mConnPool[s->server()->id()];
if (c->inTransaction()) { s->setStatus(Connection::LogicError);
cp->putTransactionConnection(s, c->inPendWatch(), c->inPendMulti()); addPostEvent(s, Multiplexor::ErrorEvent);
} else if (c->inSub(true)) { cp->putPrivateConnection(s);
cp->putPrivateConnection(s);
s->setStatus(Connection::LogicError);
addPostEvent(s, Multiplexor::ErrorEvent);
} else {
cp->putPrivateConnection(s);
}
c->detachConnectConnection(); c->detachConnectConnection();
s->detachAcceptConnection(); s->detachAcceptConnection();
} }
@ -644,6 +638,10 @@ void Handler::postHandleRequest(Request* req, ConnectConnection* s)
case Command::Blpop: case Command::Blpop:
case Command::Brpop: case Command::Brpop:
case Command::Brpoplpush: case Command::Brpoplpush:
c->setBlockRequest(true);
c->attachConnectConnection(s);
s->attachAcceptConnection(c);
break;
case Command::Unwatch: case Command::Unwatch:
case Command::Exec: case Command::Exec:
case Command::Discard: case Command::Discard:
@ -854,11 +852,11 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
default: default:
break; break;
} }
if (auto cs = c->connectConnection()) { if (s && !s->isShared()) {
if (!c->inTransaction() && !c->inSub(true)) { if (!c->inTransaction() && !c->inSub(true)) {
mConnPool[cs->server()->id()]->putPrivateConnection(cs); mConnPool[s->server()->id()]->putPrivateConnection(s);
c->detachConnectConnection(); c->detachConnectConnection();
cs->detachAcceptConnection(); s->detachAcceptConnection();
} }
} }
} }