mirror of
https://github.com/joyieldInc/predixy.git
synced 2025-12-24 14:36:42 +08:00
parent
1644a9bf1b
commit
86183a4a97
@ -5,9 +5,12 @@
|
||||
## [MasterReadPriority [0-100]] #default 50
|
||||
## [StaticSlaveReadPriority [0-100]] #default 0
|
||||
## [DynamicSlaveReadPriority [0-100]] #default 0
|
||||
## [RefreshInterval seconds] #default 1
|
||||
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
|
||||
## [ServerTimeout number[s|ms|us]] #default 1, server connection socket read/write timeout
|
||||
## [ServerFailureLimit number] #default 10
|
||||
## [ServerRetryTimeout seconds] #default 1
|
||||
## [ServerRetryTimeout number[s|ms|us]] #default 1
|
||||
## [KeepAlive seconds] #default 120, server connection tcp keepalive
|
||||
|
||||
## Servers {
|
||||
## + addr
|
||||
## ...
|
||||
@ -21,8 +24,10 @@
|
||||
# StaticSlaveReadPriority 50
|
||||
# DynamicSlaveReadPriority 50
|
||||
# RefreshInterval 1
|
||||
# ServerTimeout 1
|
||||
# ServerFailureLimit 10
|
||||
# ServerRetryTimeout 1
|
||||
# KeepAlive 120
|
||||
# Servers {
|
||||
# + 192.168.2.107:2211
|
||||
# + 192.168.2.107:2212
|
||||
|
||||
@ -9,9 +9,11 @@
|
||||
## [MasterReadPriority [0-100]] #default 50
|
||||
## [StaticSlaveReadPriority [0-100]] #default 0
|
||||
## [DynamicSlaveReadPriority [0-100]] #default 0
|
||||
## [RefreshInterval seconds] #default 1
|
||||
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
|
||||
## [ServerTimeout number[s|ms|us]] #default 1, server connection socket read/write timeout
|
||||
## [ServerFailureLimit number] #default 10
|
||||
## [ServerRetryTimeout seconds] #default 1
|
||||
## [ServerRetryTimeout number[s|ms|us]] #default 1
|
||||
## [KeepAlive seconds] #default 120, server connection tcp keepalive
|
||||
## Sentinels {
|
||||
## + addr
|
||||
## ...
|
||||
@ -33,8 +35,10 @@
|
||||
# StaticSlaveReadPriority 50
|
||||
# DynamicSlaveReadPriority 50
|
||||
# RefreshInterval 1
|
||||
# ServerTimeout 1
|
||||
# ServerFailureLimit 10
|
||||
# ServerRetryTimeout 1
|
||||
# KeepAlive 120
|
||||
# Sentinels {
|
||||
# + 10.2.2.2
|
||||
# + 10.2.2.3
|
||||
|
||||
43
src/Conf.cpp
43
src/Conf.cpp
@ -49,8 +49,6 @@ Conf::Conf():
|
||||
mLogSample[LogLevel::Notice] = 1;
|
||||
mLogSample[LogLevel::Warn] = 1;
|
||||
mLogSample[LogLevel::Error] = 1;
|
||||
mSentinelServerPool.refreshInterval = 1;
|
||||
mClusterServerPool.refreshInterval = 1;
|
||||
}
|
||||
|
||||
Conf::~Conf()
|
||||
@ -249,11 +247,15 @@ bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p)
|
||||
return true;
|
||||
} else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) {
|
||||
return true;
|
||||
} else if (setInt(sp.refreshInterval, "RefreshInterval", p, 1)) {
|
||||
} else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) {
|
||||
return true;
|
||||
} else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) {
|
||||
return true;
|
||||
} else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) {
|
||||
return true;
|
||||
} else if (setInt(sp.serverRetryTimeout, "ServerRetryTimeout", p, 1)) {
|
||||
} else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) {
|
||||
return true;
|
||||
} else if (setInt(sp.keepalive, "KeepAlive", p, 0)) {
|
||||
return true;
|
||||
} else if (setInt(sp.databases, "Databases", p, 1, 128)) {
|
||||
return true;
|
||||
@ -568,6 +570,27 @@ bool Conf::parseMemory(long& m, const char* str)
|
||||
return m >= 0;
|
||||
}
|
||||
|
||||
bool Conf::parseDuration(long& v, const char* str)
|
||||
{
|
||||
char u[4];
|
||||
int c = sscanf(str, "%ld%3s", &v, u);
|
||||
if (c == 2 && v > 0) {
|
||||
if (strcasecmp(u, "s") == 0) {
|
||||
v *= 1000000;
|
||||
} else if (strcasecmp(u, "m") == 0 || strcasecmp(u, "ms") == 0) {
|
||||
v *= 1000;
|
||||
} else if (strcasecmp(u, "u") == 0 || strcasecmp(u, "us") == 0) {
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else if (c == 1) {
|
||||
v *= 1000000;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return v >= 0;
|
||||
}
|
||||
|
||||
bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n)
|
||||
{
|
||||
if (strcasecmp(name, n->key.c_str()) != 0) {
|
||||
@ -580,6 +603,18 @@ bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n)
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Conf::setDuration(long& v, const char* name, const ConfParser::Node* n)
|
||||
{
|
||||
if (strcasecmp(name, n->key.c_str()) != 0) {
|
||||
return false;
|
||||
}
|
||||
if (!parseDuration(v, n->val.c_str())) {
|
||||
Throw(InvalidValue, "%s:%d %s invalid duration value \"%s\"",
|
||||
n->file, n->line, name, n->val.c_str());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Conf::setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* p)
|
||||
{
|
||||
if (strcasecmp(p->key.c_str(), name) != 0) {
|
||||
|
||||
@ -49,9 +49,11 @@ struct ServerPoolConf
|
||||
int masterReadPriority = 50;
|
||||
int staticSlaveReadPriority = 0;
|
||||
int dynamicSlaveReadPriority = 0;
|
||||
int refreshInterval = 1; //seconds
|
||||
long refreshInterval = 1000000; //us
|
||||
long serverTimeout = 1000000; //us
|
||||
int serverFailureLimit = 10;
|
||||
int serverRetryTimeout = 1; //seconds
|
||||
long serverRetryTimeout = 1000000; //us
|
||||
int keepalive = 120; //seconds
|
||||
int databases = 1;
|
||||
};
|
||||
|
||||
@ -180,6 +182,7 @@ public:
|
||||
}
|
||||
public:
|
||||
static bool parseMemory(long& m, const char* str);
|
||||
static bool parseDuration(long& v, const char* str);
|
||||
private:
|
||||
void setGlobal(const ConfParser::Node* node);
|
||||
void setAuthority(const ConfParser::Node* node);
|
||||
@ -193,6 +196,7 @@ private:
|
||||
bool setLong(long& attr, const char* name, const ConfParser::Node* n, long lower = LONG_MIN, long upper = LONG_MAX);
|
||||
bool setBool(bool& attr, const char* name, const ConfParser::Node* n);
|
||||
bool setMemory(long& mem, const char* name, const ConfParser::Node* n);
|
||||
bool setDuration(long& v, const char* name, const ConfParser::Node* n);
|
||||
bool setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* n);
|
||||
void setDC(DCConf& dc, const ConfParser::Node* n);
|
||||
void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n);
|
||||
|
||||
@ -73,6 +73,11 @@ public:
|
||||
{
|
||||
return mSendRequests.size() + mSentRequests.size();
|
||||
}
|
||||
Request* frontRequest() const
|
||||
{
|
||||
return !mSentRequests.empty() ? mSentRequests.front() :
|
||||
(!mSendRequests.empty() ? mSendRequests.front() : nullptr);
|
||||
}
|
||||
private:
|
||||
void parse(Handler* h, Buffer* buf, int pos);
|
||||
void handleResponse(Handler* h);
|
||||
|
||||
@ -30,12 +30,14 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
|
||||
mHandler->id(), db, (int)mShareConns.size());
|
||||
return nullptr;
|
||||
}
|
||||
bool needInit = false;
|
||||
ConnectConnection* c = mShareConns[db];
|
||||
if (!c) {
|
||||
c = ConnectConnectionAlloc::create(mServ, true);
|
||||
c->setDb(db);
|
||||
++mStats.connections;
|
||||
mShareConns[db] = c;
|
||||
needInit = true;
|
||||
logNotice("h %d create server connection %s %d",
|
||||
mHandler->id(), c->peer(), c->fd());
|
||||
} else if (c->fd() < 0) {
|
||||
@ -43,16 +45,19 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
|
||||
return nullptr;
|
||||
}
|
||||
c->reopen();
|
||||
needInit = true;
|
||||
logNotice("h %d reopen server connection %s %d",
|
||||
mHandler->id(), c->peer(), c->fd());
|
||||
} else {
|
||||
return c;
|
||||
}
|
||||
if (!init(c)) {
|
||||
if (needInit && !init(c)) {
|
||||
c->close(mHandler);
|
||||
return nullptr;
|
||||
}
|
||||
if (mServ->fail() || c->isConnecting()) {
|
||||
return nullptr;
|
||||
}
|
||||
return c;
|
||||
return mServ->fail() ? nullptr : c;
|
||||
}
|
||||
|
||||
ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db)
|
||||
@ -91,7 +96,7 @@ ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db)
|
||||
ccl.push_back(c);
|
||||
return nullptr;
|
||||
}
|
||||
return c;
|
||||
return mServ->fail() ? nullptr : c;
|
||||
}
|
||||
|
||||
void ConnectConnectionPool::putPrivateConnection(ConnectConnection* s)
|
||||
@ -118,6 +123,11 @@ bool ConnectConnectionPool::init(ConnectConnection* c)
|
||||
logWarn("h %d s %s %d settcpnodelay fail %s",
|
||||
mHandler->id(), c->peer(), c->fd(), StrError());
|
||||
}
|
||||
auto sp = mHandler->proxy()->serverPool();
|
||||
if (sp->keepalive() > 0 && !c->setTcpKeepAlive(sp->keepalive())) {
|
||||
logWarn("h %d s %s %d settcpkeepalive(%d) fail %s",
|
||||
mHandler->id(), c->peer(), c->fd(), sp->keepalive(),StrError());
|
||||
}
|
||||
auto m = mHandler->eventLoop();
|
||||
if (!m->addSocket(c, Multiplexor::ReadEvent|Multiplexor::WriteEvent)) {
|
||||
logWarn("h %d s %s %d add to eventloop fail",
|
||||
@ -141,7 +151,6 @@ bool ConnectConnectionPool::init(ConnectConnection* c)
|
||||
logDebug("h %d s %s %d auth req %ld",
|
||||
mHandler->id(), c->peer(), c->fd(), req->id());
|
||||
}
|
||||
auto sp = mHandler->proxy()->serverPool();
|
||||
if (sp->type() == ServerPool::Cluster) {
|
||||
RequestPtr req = RequestAlloc::create(Request::Readonly);
|
||||
mHandler->handleRequest(req, c);
|
||||
|
||||
@ -25,7 +25,8 @@ public:
|
||||
enum StatusEnum
|
||||
{
|
||||
ParseError = Socket::CustomStatus,
|
||||
LogicError
|
||||
LogicError,
|
||||
TimeoutError
|
||||
};
|
||||
public:
|
||||
Connection();
|
||||
|
||||
@ -86,6 +86,11 @@ public:
|
||||
{
|
||||
return node(obj)->next(Idx);
|
||||
}
|
||||
bool exist(T* obj) const
|
||||
{
|
||||
auto n = node(obj);
|
||||
return n->prev(Idx) != nullptr || n->next(Idx) != nullptr || n == mHead;
|
||||
}
|
||||
void push_back(T* obj)
|
||||
{
|
||||
N* p = static_cast<N*>(obj);
|
||||
|
||||
@ -62,6 +62,13 @@ void Handler::run()
|
||||
}
|
||||
refreshServerPool();
|
||||
checkConnectionPool();
|
||||
timeout = mProxy->serverPool()->serverTimeout();
|
||||
if (timeout > 0) {
|
||||
int num = checkServerTimeout(timeout);
|
||||
if (num > 0) {
|
||||
postEvent();
|
||||
}
|
||||
}
|
||||
if (mStatsVer < mProxy->statsVer()) {
|
||||
resetStats();
|
||||
}
|
||||
@ -103,6 +110,29 @@ void Handler::checkConnectionPool()
|
||||
}
|
||||
}
|
||||
|
||||
int Handler::checkServerTimeout(long timeout)
|
||||
{
|
||||
int num = 0;
|
||||
auto now = Util::elapsedUSec();
|
||||
auto n = mWaitConnectConns.front();
|
||||
while (n) {
|
||||
auto s = n;
|
||||
n = mWaitConnectConns.next(n);
|
||||
if (auto req = s->frontRequest()) {
|
||||
long elapsed = now - req->createTime();
|
||||
if (elapsed >= timeout) {
|
||||
s->setStatus(Connection::TimeoutError);
|
||||
addPostEvent(s, Multiplexor::ErrorEvent);
|
||||
mWaitConnectConns.remove(s);
|
||||
++num;
|
||||
}
|
||||
} else {
|
||||
mWaitConnectConns.remove(s);
|
||||
}
|
||||
}
|
||||
return num;
|
||||
}
|
||||
|
||||
void Handler::handleEvent(Socket* s, int evts)
|
||||
{
|
||||
FuncCallTimer();
|
||||
@ -216,6 +246,10 @@ void Handler::postConnectConnectionEvent()
|
||||
}
|
||||
if (!ret) {
|
||||
s->setStatus(Multiplexor::ErrorEvent);
|
||||
} else {
|
||||
if (s->isShared() && !mWaitConnectConns.exist(s)) {
|
||||
mWaitConnectConns.push_back(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -384,6 +418,8 @@ void Handler::handleConnectConnectionEvent(ConnectConnection* s, int evts)
|
||||
if (s->good() && (evts & Multiplexor::WriteEvent)) {
|
||||
if (s->isConnecting()) {
|
||||
s->setConnected();
|
||||
logDebug("h %d s %s %d connected",
|
||||
id(), s->peer(), s->fd());
|
||||
}
|
||||
addPostEvent(s, Multiplexor::WriteEvent);
|
||||
}
|
||||
|
||||
@ -96,6 +96,7 @@ private:
|
||||
void refreshServerPool();
|
||||
void checkConnectionPool();
|
||||
int checkClientTimeout(long timeout);
|
||||
int checkServerTimeout(long timeout);
|
||||
void innerResponse(ConnectConnection* c, Request* req, Response* res);
|
||||
void infoRequest(Request* req, const String& key);
|
||||
void infoLatencyRequest(Request* req);
|
||||
|
||||
@ -18,9 +18,11 @@ void ServerPool::init(const ServerPoolConf& conf)
|
||||
mMasterReadPriority = conf.masterReadPriority;
|
||||
mStaticSlaveReadPriority = conf.staticSlaveReadPriority;
|
||||
mDynamicSlaveReadPriority = conf.dynamicSlaveReadPriority;
|
||||
mRefreshInterval = conf.refreshInterval * 1000000;
|
||||
mRefreshInterval = conf.refreshInterval;
|
||||
mServerTimeout = conf.serverTimeout;
|
||||
mServerFailureLimit = conf.serverFailureLimit;
|
||||
mServerRetryTimeout = conf.serverRetryTimeout * 1000000;
|
||||
mServerRetryTimeout = conf.serverRetryTimeout;
|
||||
mKeepAlive = conf.keepalive;
|
||||
mDbNum = conf.databases;
|
||||
}
|
||||
|
||||
|
||||
@ -56,6 +56,10 @@ public:
|
||||
{
|
||||
return mRefreshInterval;
|
||||
}
|
||||
long serverTimeout() const
|
||||
{
|
||||
return mServerTimeout;
|
||||
}
|
||||
int serverFailureLimit() const
|
||||
{
|
||||
return mServerFailureLimit;
|
||||
@ -64,6 +68,10 @@ public:
|
||||
{
|
||||
return mServerRetryTimeout;
|
||||
}
|
||||
int keepalive() const
|
||||
{
|
||||
return mKeepAlive;
|
||||
}
|
||||
int dbNum() const
|
||||
{
|
||||
return mDbNum;
|
||||
@ -133,8 +141,10 @@ private:
|
||||
int mStaticSlaveReadPriority;
|
||||
int mDynamicSlaveReadPriority;
|
||||
long mRefreshInterval;
|
||||
long mServerTimeout;
|
||||
int mServerFailureLimit;
|
||||
long mServerRetryTimeout;
|
||||
int mKeepAlive;
|
||||
int mDbNum;
|
||||
};
|
||||
|
||||
|
||||
@ -155,6 +155,31 @@ bool Socket::setTcpNoDelay(bool val)
|
||||
return ret == 0;
|
||||
}
|
||||
|
||||
bool Socket::setTcpKeepAlive(int interval)
|
||||
{
|
||||
int val = 1;
|
||||
int ret = setsockopt(mFd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
|
||||
if (ret != 0) {
|
||||
return false;
|
||||
}
|
||||
val = interval;
|
||||
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
|
||||
if (ret != 0) {
|
||||
return false;
|
||||
}
|
||||
val = interval / 3;
|
||||
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val));
|
||||
if (ret != 0) {
|
||||
return false;
|
||||
}
|
||||
val = 3;
|
||||
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val));
|
||||
if (ret != 0) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
int Socket::read(void* buf, int cnt)
|
||||
{
|
||||
FuncCallTimer();
|
||||
|
||||
@ -46,7 +46,7 @@ public:
|
||||
EventError,
|
||||
ExceptError,
|
||||
|
||||
CustomStatus
|
||||
CustomStatus = 100
|
||||
};
|
||||
public:
|
||||
Socket(int fd = -1);
|
||||
@ -59,6 +59,7 @@ public:
|
||||
void close();
|
||||
bool setNonBlock(bool val = true);
|
||||
bool setTcpNoDelay(bool val = true);
|
||||
bool setTcpKeepAlive(int interval);
|
||||
int read(void* buf, int cnt);
|
||||
int write(const void* buf, int cnt);
|
||||
int writev(const struct iovec* vecs, int cnt);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user