support redis server connection keepalive and timeout

fix Issue #21
This commit is contained in:
fortrue 2018-02-23 16:47:03 +08:00
parent 1644a9bf1b
commit 86183a4a97
14 changed files with 162 additions and 19 deletions

View File

@ -5,9 +5,12 @@
## [MasterReadPriority [0-100]] #default 50 ## [MasterReadPriority [0-100]] #default 50
## [StaticSlaveReadPriority [0-100]] #default 0 ## [StaticSlaveReadPriority [0-100]] #default 0
## [DynamicSlaveReadPriority [0-100]] #default 0 ## [DynamicSlaveReadPriority [0-100]] #default 0
## [RefreshInterval seconds] #default 1 ## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
## [ServerTimeout number[s|ms|us]] #default 1, server connection socket read/write timeout
## [ServerFailureLimit number] #default 10 ## [ServerFailureLimit number] #default 10
## [ServerRetryTimeout seconds] #default 1 ## [ServerRetryTimeout number[s|ms|us]] #default 1
## [KeepAlive seconds] #default 120, server connection tcp keepalive
## Servers { ## Servers {
## + addr ## + addr
## ... ## ...
@ -21,8 +24,10 @@
# StaticSlaveReadPriority 50 # StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50 # DynamicSlaveReadPriority 50
# RefreshInterval 1 # RefreshInterval 1
# ServerTimeout 1
# ServerFailureLimit 10 # ServerFailureLimit 10
# ServerRetryTimeout 1 # ServerRetryTimeout 1
# KeepAlive 120
# Servers { # Servers {
# + 192.168.2.107:2211 # + 192.168.2.107:2211
# + 192.168.2.107:2212 # + 192.168.2.107:2212

View File

@ -9,9 +9,11 @@
## [MasterReadPriority [0-100]] #default 50 ## [MasterReadPriority [0-100]] #default 50
## [StaticSlaveReadPriority [0-100]] #default 0 ## [StaticSlaveReadPriority [0-100]] #default 0
## [DynamicSlaveReadPriority [0-100]] #default 0 ## [DynamicSlaveReadPriority [0-100]] #default 0
## [RefreshInterval seconds] #default 1 ## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
## [ServerTimeout number[s|ms|us]] #default 1, server connection socket read/write timeout
## [ServerFailureLimit number] #default 10 ## [ServerFailureLimit number] #default 10
## [ServerRetryTimeout seconds] #default 1 ## [ServerRetryTimeout number[s|ms|us]] #default 1
## [KeepAlive seconds] #default 120, server connection tcp keepalive
## Sentinels { ## Sentinels {
## + addr ## + addr
## ... ## ...
@ -33,8 +35,10 @@
# StaticSlaveReadPriority 50 # StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50 # DynamicSlaveReadPriority 50
# RefreshInterval 1 # RefreshInterval 1
# ServerTimeout 1
# ServerFailureLimit 10 # ServerFailureLimit 10
# ServerRetryTimeout 1 # ServerRetryTimeout 1
# KeepAlive 120
# Sentinels { # Sentinels {
# + 10.2.2.2 # + 10.2.2.2
# + 10.2.2.3 # + 10.2.2.3

View File

@ -49,8 +49,6 @@ Conf::Conf():
mLogSample[LogLevel::Notice] = 1; mLogSample[LogLevel::Notice] = 1;
mLogSample[LogLevel::Warn] = 1; mLogSample[LogLevel::Warn] = 1;
mLogSample[LogLevel::Error] = 1; mLogSample[LogLevel::Error] = 1;
mSentinelServerPool.refreshInterval = 1;
mClusterServerPool.refreshInterval = 1;
} }
Conf::~Conf() Conf::~Conf()
@ -249,11 +247,15 @@ bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p)
return true; return true;
} else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) { } else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) {
return true; return true;
} else if (setInt(sp.refreshInterval, "RefreshInterval", p, 1)) { } else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) {
return true;
} else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) {
return true; return true;
} else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) { } else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) {
return true; return true;
} else if (setInt(sp.serverRetryTimeout, "ServerRetryTimeout", p, 1)) { } else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) {
return true;
} else if (setInt(sp.keepalive, "KeepAlive", p, 0)) {
return true; return true;
} else if (setInt(sp.databases, "Databases", p, 1, 128)) { } else if (setInt(sp.databases, "Databases", p, 1, 128)) {
return true; return true;
@ -568,6 +570,27 @@ bool Conf::parseMemory(long& m, const char* str)
return m >= 0; return m >= 0;
} }
bool Conf::parseDuration(long& v, const char* str)
{
char u[4];
int c = sscanf(str, "%ld%3s", &v, u);
if (c == 2 && v > 0) {
if (strcasecmp(u, "s") == 0) {
v *= 1000000;
} else if (strcasecmp(u, "m") == 0 || strcasecmp(u, "ms") == 0) {
v *= 1000;
} else if (strcasecmp(u, "u") == 0 || strcasecmp(u, "us") == 0) {
} else {
return false;
}
} else if (c == 1) {
v *= 1000000;
} else {
return false;
}
return v >= 0;
}
bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n) bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n)
{ {
if (strcasecmp(name, n->key.c_str()) != 0) { if (strcasecmp(name, n->key.c_str()) != 0) {
@ -580,6 +603,18 @@ bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n)
return true; return true;
} }
bool Conf::setDuration(long& v, const char* name, const ConfParser::Node* n)
{
if (strcasecmp(name, n->key.c_str()) != 0) {
return false;
}
if (!parseDuration(v, n->val.c_str())) {
Throw(InvalidValue, "%s:%d %s invalid duration value \"%s\"",
n->file, n->line, name, n->val.c_str());
}
return true;
}
bool Conf::setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* p) bool Conf::setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* p)
{ {
if (strcasecmp(p->key.c_str(), name) != 0) { if (strcasecmp(p->key.c_str(), name) != 0) {

View File

@ -49,9 +49,11 @@ struct ServerPoolConf
int masterReadPriority = 50; int masterReadPriority = 50;
int staticSlaveReadPriority = 0; int staticSlaveReadPriority = 0;
int dynamicSlaveReadPriority = 0; int dynamicSlaveReadPriority = 0;
int refreshInterval = 1; //seconds long refreshInterval = 1000000; //us
long serverTimeout = 1000000; //us
int serverFailureLimit = 10; int serverFailureLimit = 10;
int serverRetryTimeout = 1; //seconds long serverRetryTimeout = 1000000; //us
int keepalive = 120; //seconds
int databases = 1; int databases = 1;
}; };
@ -180,6 +182,7 @@ public:
} }
public: public:
static bool parseMemory(long& m, const char* str); static bool parseMemory(long& m, const char* str);
static bool parseDuration(long& v, const char* str);
private: private:
void setGlobal(const ConfParser::Node* node); void setGlobal(const ConfParser::Node* node);
void setAuthority(const ConfParser::Node* node); void setAuthority(const ConfParser::Node* node);
@ -193,6 +196,7 @@ private:
bool setLong(long& attr, const char* name, const ConfParser::Node* n, long lower = LONG_MIN, long upper = LONG_MAX); bool setLong(long& attr, const char* name, const ConfParser::Node* n, long lower = LONG_MIN, long upper = LONG_MAX);
bool setBool(bool& attr, const char* name, const ConfParser::Node* n); bool setBool(bool& attr, const char* name, const ConfParser::Node* n);
bool setMemory(long& mem, const char* name, const ConfParser::Node* n); bool setMemory(long& mem, const char* name, const ConfParser::Node* n);
bool setDuration(long& v, const char* name, const ConfParser::Node* n);
bool setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* n); bool setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* n);
void setDC(DCConf& dc, const ConfParser::Node* n); void setDC(DCConf& dc, const ConfParser::Node* n);
void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n); void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n);

View File

@ -73,6 +73,11 @@ public:
{ {
return mSendRequests.size() + mSentRequests.size(); return mSendRequests.size() + mSentRequests.size();
} }
Request* frontRequest() const
{
return !mSentRequests.empty() ? mSentRequests.front() :
(!mSendRequests.empty() ? mSendRequests.front() : nullptr);
}
private: private:
void parse(Handler* h, Buffer* buf, int pos); void parse(Handler* h, Buffer* buf, int pos);
void handleResponse(Handler* h); void handleResponse(Handler* h);

View File

@ -30,12 +30,14 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
mHandler->id(), db, (int)mShareConns.size()); mHandler->id(), db, (int)mShareConns.size());
return nullptr; return nullptr;
} }
bool needInit = false;
ConnectConnection* c = mShareConns[db]; ConnectConnection* c = mShareConns[db];
if (!c) { if (!c) {
c = ConnectConnectionAlloc::create(mServ, true); c = ConnectConnectionAlloc::create(mServ, true);
c->setDb(db); c->setDb(db);
++mStats.connections; ++mStats.connections;
mShareConns[db] = c; mShareConns[db] = c;
needInit = true;
logNotice("h %d create server connection %s %d", logNotice("h %d create server connection %s %d",
mHandler->id(), c->peer(), c->fd()); mHandler->id(), c->peer(), c->fd());
} else if (c->fd() < 0) { } else if (c->fd() < 0) {
@ -43,16 +45,19 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
return nullptr; return nullptr;
} }
c->reopen(); c->reopen();
needInit = true;
logNotice("h %d reopen server connection %s %d", logNotice("h %d reopen server connection %s %d",
mHandler->id(), c->peer(), c->fd()); mHandler->id(), c->peer(), c->fd());
} else {
return c;
} }
if (!init(c)) { if (needInit && !init(c)) {
c->close(mHandler); c->close(mHandler);
return nullptr; return nullptr;
} }
if (mServ->fail() || c->isConnecting()) {
return nullptr;
}
return c; return c;
return mServ->fail() ? nullptr : c;
} }
ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db) ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db)
@ -91,7 +96,7 @@ ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db)
ccl.push_back(c); ccl.push_back(c);
return nullptr; return nullptr;
} }
return c; return mServ->fail() ? nullptr : c;
} }
void ConnectConnectionPool::putPrivateConnection(ConnectConnection* s) void ConnectConnectionPool::putPrivateConnection(ConnectConnection* s)
@ -118,6 +123,11 @@ bool ConnectConnectionPool::init(ConnectConnection* c)
logWarn("h %d s %s %d settcpnodelay fail %s", logWarn("h %d s %s %d settcpnodelay fail %s",
mHandler->id(), c->peer(), c->fd(), StrError()); mHandler->id(), c->peer(), c->fd(), StrError());
} }
auto sp = mHandler->proxy()->serverPool();
if (sp->keepalive() > 0 && !c->setTcpKeepAlive(sp->keepalive())) {
logWarn("h %d s %s %d settcpkeepalive(%d) fail %s",
mHandler->id(), c->peer(), c->fd(), sp->keepalive(),StrError());
}
auto m = mHandler->eventLoop(); auto m = mHandler->eventLoop();
if (!m->addSocket(c, Multiplexor::ReadEvent|Multiplexor::WriteEvent)) { if (!m->addSocket(c, Multiplexor::ReadEvent|Multiplexor::WriteEvent)) {
logWarn("h %d s %s %d add to eventloop fail", logWarn("h %d s %s %d add to eventloop fail",
@ -141,7 +151,6 @@ bool ConnectConnectionPool::init(ConnectConnection* c)
logDebug("h %d s %s %d auth req %ld", logDebug("h %d s %s %d auth req %ld",
mHandler->id(), c->peer(), c->fd(), req->id()); mHandler->id(), c->peer(), c->fd(), req->id());
} }
auto sp = mHandler->proxy()->serverPool();
if (sp->type() == ServerPool::Cluster) { if (sp->type() == ServerPool::Cluster) {
RequestPtr req = RequestAlloc::create(Request::Readonly); RequestPtr req = RequestAlloc::create(Request::Readonly);
mHandler->handleRequest(req, c); mHandler->handleRequest(req, c);

View File

@ -25,7 +25,8 @@ public:
enum StatusEnum enum StatusEnum
{ {
ParseError = Socket::CustomStatus, ParseError = Socket::CustomStatus,
LogicError LogicError,
TimeoutError
}; };
public: public:
Connection(); Connection();

View File

@ -86,6 +86,11 @@ public:
{ {
return node(obj)->next(Idx); return node(obj)->next(Idx);
} }
bool exist(T* obj) const
{
auto n = node(obj);
return n->prev(Idx) != nullptr || n->next(Idx) != nullptr || n == mHead;
}
void push_back(T* obj) void push_back(T* obj)
{ {
N* p = static_cast<N*>(obj); N* p = static_cast<N*>(obj);

View File

@ -62,6 +62,13 @@ void Handler::run()
} }
refreshServerPool(); refreshServerPool();
checkConnectionPool(); checkConnectionPool();
timeout = mProxy->serverPool()->serverTimeout();
if (timeout > 0) {
int num = checkServerTimeout(timeout);
if (num > 0) {
postEvent();
}
}
if (mStatsVer < mProxy->statsVer()) { if (mStatsVer < mProxy->statsVer()) {
resetStats(); resetStats();
} }
@ -103,6 +110,29 @@ void Handler::checkConnectionPool()
} }
} }
int Handler::checkServerTimeout(long timeout)
{
int num = 0;
auto now = Util::elapsedUSec();
auto n = mWaitConnectConns.front();
while (n) {
auto s = n;
n = mWaitConnectConns.next(n);
if (auto req = s->frontRequest()) {
long elapsed = now - req->createTime();
if (elapsed >= timeout) {
s->setStatus(Connection::TimeoutError);
addPostEvent(s, Multiplexor::ErrorEvent);
mWaitConnectConns.remove(s);
++num;
}
} else {
mWaitConnectConns.remove(s);
}
}
return num;
}
void Handler::handleEvent(Socket* s, int evts) void Handler::handleEvent(Socket* s, int evts)
{ {
FuncCallTimer(); FuncCallTimer();
@ -216,6 +246,10 @@ void Handler::postConnectConnectionEvent()
} }
if (!ret) { if (!ret) {
s->setStatus(Multiplexor::ErrorEvent); s->setStatus(Multiplexor::ErrorEvent);
} else {
if (s->isShared() && !mWaitConnectConns.exist(s)) {
mWaitConnectConns.push_back(s);
}
} }
} }
} }
@ -384,6 +418,8 @@ void Handler::handleConnectConnectionEvent(ConnectConnection* s, int evts)
if (s->good() && (evts & Multiplexor::WriteEvent)) { if (s->good() && (evts & Multiplexor::WriteEvent)) {
if (s->isConnecting()) { if (s->isConnecting()) {
s->setConnected(); s->setConnected();
logDebug("h %d s %s %d connected",
id(), s->peer(), s->fd());
} }
addPostEvent(s, Multiplexor::WriteEvent); addPostEvent(s, Multiplexor::WriteEvent);
} }

View File

@ -96,6 +96,7 @@ private:
void refreshServerPool(); void refreshServerPool();
void checkConnectionPool(); void checkConnectionPool();
int checkClientTimeout(long timeout); int checkClientTimeout(long timeout);
int checkServerTimeout(long timeout);
void innerResponse(ConnectConnection* c, Request* req, Response* res); void innerResponse(ConnectConnection* c, Request* req, Response* res);
void infoRequest(Request* req, const String& key); void infoRequest(Request* req, const String& key);
void infoLatencyRequest(Request* req); void infoLatencyRequest(Request* req);

View File

@ -18,9 +18,11 @@ void ServerPool::init(const ServerPoolConf& conf)
mMasterReadPriority = conf.masterReadPriority; mMasterReadPriority = conf.masterReadPriority;
mStaticSlaveReadPriority = conf.staticSlaveReadPriority; mStaticSlaveReadPriority = conf.staticSlaveReadPriority;
mDynamicSlaveReadPriority = conf.dynamicSlaveReadPriority; mDynamicSlaveReadPriority = conf.dynamicSlaveReadPriority;
mRefreshInterval = conf.refreshInterval * 1000000; mRefreshInterval = conf.refreshInterval;
mServerTimeout = conf.serverTimeout;
mServerFailureLimit = conf.serverFailureLimit; mServerFailureLimit = conf.serverFailureLimit;
mServerRetryTimeout = conf.serverRetryTimeout * 1000000; mServerRetryTimeout = conf.serverRetryTimeout;
mKeepAlive = conf.keepalive;
mDbNum = conf.databases; mDbNum = conf.databases;
} }

View File

@ -56,6 +56,10 @@ public:
{ {
return mRefreshInterval; return mRefreshInterval;
} }
long serverTimeout() const
{
return mServerTimeout;
}
int serverFailureLimit() const int serverFailureLimit() const
{ {
return mServerFailureLimit; return mServerFailureLimit;
@ -64,6 +68,10 @@ public:
{ {
return mServerRetryTimeout; return mServerRetryTimeout;
} }
int keepalive() const
{
return mKeepAlive;
}
int dbNum() const int dbNum() const
{ {
return mDbNum; return mDbNum;
@ -133,8 +141,10 @@ private:
int mStaticSlaveReadPriority; int mStaticSlaveReadPriority;
int mDynamicSlaveReadPriority; int mDynamicSlaveReadPriority;
long mRefreshInterval; long mRefreshInterval;
long mServerTimeout;
int mServerFailureLimit; int mServerFailureLimit;
long mServerRetryTimeout; long mServerRetryTimeout;
int mKeepAlive;
int mDbNum; int mDbNum;
}; };

View File

@ -155,6 +155,31 @@ bool Socket::setTcpNoDelay(bool val)
return ret == 0; return ret == 0;
} }
bool Socket::setTcpKeepAlive(int interval)
{
int val = 1;
int ret = setsockopt(mFd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
if (ret != 0) {
return false;
}
val = interval;
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
if (ret != 0) {
return false;
}
val = interval / 3;
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val));
if (ret != 0) {
return false;
}
val = 3;
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val));
if (ret != 0) {
return false;
}
return true;
}
int Socket::read(void* buf, int cnt) int Socket::read(void* buf, int cnt)
{ {
FuncCallTimer(); FuncCallTimer();

View File

@ -46,7 +46,7 @@ public:
EventError, EventError,
ExceptError, ExceptError,
CustomStatus CustomStatus = 100
}; };
public: public:
Socket(int fd = -1); Socket(int fd = -1);
@ -59,6 +59,7 @@ public:
void close(); void close();
bool setNonBlock(bool val = true); bool setNonBlock(bool val = true);
bool setTcpNoDelay(bool val = true); bool setTcpNoDelay(bool val = true);
bool setTcpKeepAlive(int interval);
int read(void* buf, int cnt); int read(void* buf, int cnt);
int write(const void* buf, int cnt); int write(const void* buf, int cnt);
int writev(const struct iovec* vecs, int cnt); int writev(const struct iovec* vecs, int cnt);