notes on ClusterServerPool.cpp

This commit is contained in:
Iwan BK 2022-04-08 08:26:09 +07:00
parent 2380a4f252
commit b8ca3d67de
3 changed files with 21 additions and 6 deletions

View File

@ -95,7 +95,7 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request
} }
while (true) { while (true) {
ClusterNodesParser::Status st = p.parse(); ClusterNodesParser::Status st = p.parse();
if (st == ClusterNodesParser::Node) { if (st == ClusterNodesParser::Node) { // it is a node
logDebug("redis cluster update parse node %s %s %s %s", logDebug("redis cluster update parse node %s %s %s %s",
p.nodeId().data(), p.nodeId().data(),
p.addr().data(), p.addr().data(),
@ -112,8 +112,11 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request
String addr(p.addr()); String addr(p.addr());
auto it = mServs.find(addr); auto it = mServs.find(addr);
Server* serv = it == mServs.end() ? nullptr : it->second; Server* serv = it == mServs.end() ? nullptr : it->second;
// the raw address is not in our server list, check further for these:
// - myself
// - if using `@` separator
if (!serv) { if (!serv) {
if (strstr(p.flags().data(), "myself")) { if (strstr(p.flags().data(), "myself")) { // the node is where the command get executed
serv = s->server(); serv = s->server();
} else if (const char* t = strchr(p.addr().data(), '@')) { } else if (const char* t = strchr(p.addr().data(), '@')) {
addr = String(p.addr().data(), t - p.addr().data()); addr = String(p.addr().data(), t - p.addr().data());
@ -121,11 +124,15 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request
serv = it == mServs.end() ? nullptr : it->second; serv = it == mServs.end() ? nullptr : it->second;
} }
} }
// still not found in our server list, check further:
// - unknown role || fair || handshake -> ignore
// - num of server already exceed the pool capacity -> ignore it
// if not above, create new server object and add to the mServPool & mServs
if (!serv) { if (!serv) {
const char* flags = p.flags().data(); const char* flags = p.flags().data();
if (p.role() == Server::Unknown || if (p.role() == Server::Unknown ||
strstr(flags, "fail") || strstr(flags, "handshake")) { strstr(flags, "fail") || strstr(flags, "handshake")) {
logWarn("redis cluster nodes get node abnormal %s %s %s %s", logNotice("redis cluster nodes get node abnormal %s %s %s %s",
p.nodeId().data(), p.nodeId().data(),
p.addr().data(), p.addr().data(),
p.flags().data(), p.flags().data(),
@ -153,6 +160,9 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request
serv->setRole(p.role()); serv->setRole(p.role());
serv->setName(p.nodeId()); serv->setName(p.nodeId());
serv->setMasterName(p.master()); serv->setMasterName(p.master());
// if it is master
// - add group if needed
// - add the slot
if (p.role() == Server::Master) { if (p.role() == Server::Master) {
ServerGroup* g = getGroup(p.nodeId()); ServerGroup* g = getGroup(p.nodeId());
if (!g) { if (!g) {
@ -189,6 +199,8 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request
return; return;
} }
} }
// assign to proper group
// there is no health check here
for (auto serv : mServPool) { for (auto serv : mServPool) {
if (serv->updating()) { if (serv->updating()) {
serv->setUpdating(false); serv->setUpdating(false);

View File

@ -259,7 +259,7 @@ void Handler::postConnectConnectionEvent()
case Socket::EventError: case Socket::EventError:
{ {
Server* serv = s->server(); Server* serv = s->server();
serv->incrFail(); serv->incrFail(); // increase fail counter only for EventError
if (serv->fail()) { if (serv->fail()) {
logNotice("server %s mark failure", serv->addr().data()); logNotice("server %s mark failure", serv->addr().data());
} }
@ -1368,7 +1368,7 @@ void Handler::innerResponse(ConnectConnection* s, Request* req, Response* res)
if (s && res->isPong()) { if (s && res->isPong()) {
Server* serv = s->server(); Server* serv = s->server();
if (serv->fail()) { if (serv->fail()) {
serv->setFail(false); serv->setFail(false); // if server respond to ping, mark fail as false. TODO: add for other case like loading
logNotice("h %d s %s %d mark server alive", logNotice("h %d s %s %d mark server alive",
id(), s->peer(), s->fd()); id(), s->peer(), s->fd());
} }
@ -1382,7 +1382,7 @@ void Handler::innerResponse(ConnectConnection* s, Request* req, Response* res)
id(), s->peer(), s->fd()); id(), s->peer(), s->fd());
} }
break; break;
case Command::Readonly: case Command::Readonly: // when loading, predixy can detect it here
if (!res->isOk()) { if (!res->isOk()) {
s->setStatus(ConnectConnection::LogicError); s->setStatus(ConnectConnection::LogicError);
addPostEvent(s, Multiplexor::ErrorEvent); addPostEvent(s, Multiplexor::ErrorEvent);

View File

@ -47,10 +47,13 @@ bool Server::activate()
return AtomicCAS(mNextActivateTime, v, now + mPool->serverRetryTimeout()); return AtomicCAS(mNextActivateTime, v, now + mPool->serverRetryTimeout());
} }
// increase the fail counter of the server
// and set status if the fail counter already reach the threshold
void Server::incrFail() void Server::incrFail()
{ {
long cnt = ++mFailureCnt; long cnt = ++mFailureCnt;
if (cnt % mPool->serverFailureLimit() == 0) { if (cnt % mPool->serverFailureLimit() == 0) {
logError("[ibk]SET FAIL TRUE");
setFail(true); setFail(true);
} }
} }